Spaces:
Running
Running
| import gradio as gr | |
| import requests | |
| import os | |
| import time | |
| import json | |
| import threading | |
| # Suno API key | |
| SUNO_KEY = os.environ.get("SunoKey", "") | |
| if not SUNO_KEY: | |
| print("⚠️ SunoKey not set!") | |
| # Store task info for auto-fill and polling | |
| current_task_info = {} | |
| polling_active = {} | |
| def get_audio_files(task_id): | |
| """Get audio files from Suno task ID""" | |
| if not SUNO_KEY: | |
| return "❌ Error: SunoKey not configured", [] | |
| if not task_id.strip(): | |
| return "❌ Please enter a Task ID", [] | |
| try: | |
| resp = requests.get( | |
| "https://api.sunoapi.org/api/v1/generate/record-info", | |
| headers={"Authorization": f"Bearer {SUNO_KEY}"}, | |
| params={"taskId": task_id.strip()}, | |
| timeout=30 | |
| ) | |
| if resp.status_code != 200: | |
| return f"❌ Request failed: HTTP {resp.status_code}", [] | |
| data = resp.json() | |
| if data.get("code") != 200: | |
| return f"❌ API error: {data.get('msg', 'Unknown')}", [] | |
| # Check status | |
| status = data.get("data", {}).get("status", "UNKNOWN") | |
| if status != "SUCCESS": | |
| return f"⏳ Task status: {status}. Wait for generation to complete.", [] | |
| # Get audio files | |
| suno_data = data.get("data", {}).get("response", {}).get("sunoData", []) | |
| if not suno_data: | |
| return "❌ No audio files found", [] | |
| # Create dropdown options | |
| audio_options = [] | |
| for i, audio in enumerate(suno_data): | |
| audio_id = audio.get("id", f"audio_{i}") | |
| title = audio.get("title", f"Track {i+1}") | |
| duration = audio.get("duration", 0) | |
| prompt = audio.get("prompt", "")[:50] | |
| display = f"{i+1}. {title} ({duration:.1f}s)" | |
| if prompt: | |
| display += f" - {prompt}..." | |
| audio_options.append((display, audio_id)) | |
| return f"✅ Found {len(audio_options)} audio file(s)", audio_options | |
| except Exception as e: | |
| return f"❌ Error: {str(e)}", [] | |
| def submit_separation_task(task_id, audio_id, separation_type): | |
| """Submit separation task""" | |
| try: | |
| resp = requests.post( | |
| "https://api.sunoapi.org/api/v1/vocal-removal/generate", | |
| json={ | |
| "taskId": task_id, | |
| "audioId": audio_id, | |
| "type": separation_type, | |
| "callBackUrl": "https://1hit.no/callback.php" | |
| }, | |
| headers={ | |
| "Authorization": f"Bearer {SUNO_KEY}", | |
| "Content-Type": "application/json" | |
| }, | |
| timeout=30 | |
| ) | |
| if resp.status_code == 200: | |
| data = resp.json() | |
| # Get separation task ID | |
| separation_task_id = None | |
| if "taskId" in data: | |
| separation_task_id = data["taskId"] | |
| elif data.get("code") == 200 and "data" in data and "taskId" in data["data"]: | |
| separation_task_id = data["data"]["taskId"] | |
| if separation_task_id: | |
| # Store for auto-fill | |
| current_task_info["separation_task_id"] = separation_task_id | |
| current_task_info["original_task_id"] = task_id | |
| current_task_info["audio_id"] = audio_id | |
| current_task_info["separation_type"] = separation_type | |
| current_task_info["start_time"] = time.time() | |
| current_task_info["attempt"] = 0 | |
| return f"✅ Task submitted!\n\n**Separation Task ID:** `{separation_task_id}`\n\n⏳ Starting auto-polling...", separation_task_id | |
| else: | |
| return f"❌ No task ID in response:\n{json.dumps(data, indent=2)}", None | |
| else: | |
| return f"❌ HTTP Error {resp.status_code}:\n{resp.text}", None | |
| except Exception as e: | |
| return f"❌ Error: {str(e)}", None | |
| def poll_separation_status(separation_task_id): | |
| """Poll separation task status""" | |
| try: | |
| resp = requests.get( | |
| "https://api.sunoapi.org/api/v1/vocal-removal/record-info", | |
| headers={"Authorization": f"Bearer {SUNO_KEY}"}, | |
| params={"taskId": separation_task_id}, | |
| timeout=30 | |
| ) | |
| if resp.status_code == 200: | |
| data = resp.json() | |
| # Get status | |
| status = "UNKNOWN" | |
| if "status" in data: | |
| status = data["status"] | |
| elif "data" in data and "status" in data["data"]: | |
| status = data["data"]["status"] | |
| elif data.get("code") == 200: | |
| status = "SUCCESS" | |
| # Get results | |
| results = {} | |
| if "vocal_removal_info" in data: | |
| results = data["vocal_removal_info"] | |
| elif "data" in data and "vocal_removal_info" in data["data"]: | |
| results = data["data"]["vocal_removal_info"] | |
| return status, results, None | |
| else: | |
| return "ERROR", {}, f"HTTP {resp.status_code}: {resp.text}" | |
| except Exception as e: | |
| return "ERROR", {}, str(e) | |
| def format_download_links(results, separation_task_id): | |
| """Format download links from results""" | |
| if not results: | |
| return "No download links found" | |
| output = "## 🎵 Download Links\n\n" | |
| # 2-stem separation | |
| if "vocal_url" in results or "instrumental_url" in results: | |
| if results.get("vocal_url"): | |
| output += f"**🎤 Vocals:** [Download MP3]({results['vocal_url']})\n" | |
| if results.get("instrumental_url"): | |
| output += f"**🎵 Instrumental:** [Download MP3]({results['instrumental_url']})\n" | |
| # 12-stem separation | |
| stem_fields = [ | |
| ("backing_vocals_url", "🎤 Backing Vocals"), | |
| ("bass_url", "🎸 Bass"), | |
| ("brass_url", "🎺 Brass"), | |
| ("drums_url", "🥁 Drums"), | |
| ("fx_url", "🎛️ FX/Other"), | |
| ("guitar_url", "🎸 Guitar"), | |
| ("keyboard_url", "🎹 Keyboard"), | |
| ("percussion_url", "🪘 Percussion"), | |
| ("strings_url", "🎻 Strings"), | |
| ("synth_url", "🎹 Synth"), | |
| ("woodwinds_url", "🎷 Woodwinds"), | |
| ("vocal_url", "🎤 Vocals"), | |
| ("instrumental_url", "🎵 Instrumental"), | |
| ] | |
| for field, name in stem_fields: | |
| if results.get(field): | |
| output += f"**{name}:** [Download MP3]({results[field]})\n" | |
| output += f"\n**🔗 Viewer:** [Open in Viewer](https://1hit.no/viewer.php?task_id={separation_task_id})" | |
| return output | |
| # Manual polling function | |
| def manual_poll(task_id): | |
| if not task_id: | |
| return "❌ Enter Task ID" | |
| status, results, error = poll_separation_status(task_id) | |
| if error: | |
| return f"❌ Error: {error}" | |
| if status == "SUCCESS": | |
| if results: | |
| output = f"✅ **Complete!**\n\n" | |
| output += f"**Task ID:** `{task_id}`\n\n" | |
| output += format_download_links(results, task_id) | |
| else: | |
| output = f"✅ **Complete** (no direct links)\n\n" | |
| output += f"**Task ID:** `{task_id}`\n\n" | |
| output += f"Check: [Viewer](https://1hit.no/viewer.php?task_id={task_id})" | |
| elif status in ["PENDING", "PROCESSING", "RUNNING"]: | |
| output = f"⏳ **Status:** {status}\n\n" | |
| output += f"**Task ID:** `{task_id}`\n\n" | |
| output += "Still processing. Try again in 30 seconds." | |
| elif status == "FAILED": | |
| output = f"❌ **Failed**\n\n" | |
| output += f"**Task ID:** `{task_id}`\n" | |
| output += f"**Status:** {status}" | |
| else: | |
| output = f"🔄 **Status:** {status}\n\n" | |
| output += f"**Task ID:** `{task_id}`" | |
| return output | |
| # Create the app | |
| with gr.Blocks() as app: | |
| gr.Markdown("# 🎵 Suno Stem Separator") | |
| with gr.Row(): | |
| # Left column: Input and control | |
| with gr.Column(scale=1): | |
| # Step 1: Get audio files | |
| with gr.Group(): | |
| gr.Markdown("### 1. Get Audio Files") | |
| original_task_id = gr.Textbox( | |
| label="Original Task ID", | |
| placeholder="Enter Suno generation task ID", | |
| info="From your Suno history" | |
| ) | |
| get_audio_btn = gr.Button("📥 Get Audio Files", variant="secondary") | |
| audio_status = gr.Markdown("Enter Task ID above") | |
| # Step 2: Select audio file | |
| with gr.Group(): | |
| gr.Markdown("### 2. Select Audio File") | |
| audio_dropdown = gr.Dropdown( | |
| label="Select Audio", | |
| choices=[], | |
| interactive=True, | |
| visible=False | |
| ) | |
| # Step 3: Start separation | |
| with gr.Group(): | |
| gr.Markdown("### 3. Start Separation") | |
| separation_type = gr.Radio( | |
| label="Separation Type", | |
| choices=[ | |
| ("🎤 Vocals Only (1 credit)", "separate_vocal"), | |
| ("🎛️ Full Stems (5 credits)", "split_stem") | |
| ], | |
| value="separate_vocal", | |
| visible=False | |
| ) | |
| submit_btn = gr.Button("🚀 Start Separation", variant="primary", visible=False) | |
| submission_output = gr.Markdown("Select audio file first", visible=False) | |
| # Right column: Results and polling | |
| with gr.Column(scale=2): | |
| # Auto-polling section | |
| with gr.Group(): | |
| gr.Markdown("### 4. Auto-Polling Status") | |
| auto_poll_status = gr.Markdown("Waiting for task submission...") | |
| auto_poll_progress = gr.Slider( | |
| minimum=0, | |
| maximum=60, | |
| value=0, | |
| label="Polling attempts", | |
| interactive=False, | |
| visible=False | |
| ) | |
| stop_poll_btn = gr.Button("⏹️ Stop Auto-Polling", variant="stop", visible=False) | |
| # Manual polling section | |
| with gr.Group(): | |
| gr.Markdown("### 5. Manual Polling") | |
| poll_task_id = gr.Textbox( | |
| label="Separation Task ID", | |
| placeholder="Will auto-fill from current task" | |
| ) | |
| poll_btn = gr.Button("🔍 Check Status", variant="secondary") | |
| poll_output = gr.Markdown("Enter Task ID to check") | |
| # Results section | |
| with gr.Group(): | |
| gr.Markdown("### 6. Download Links") | |
| download_output = gr.Markdown("Results will appear here") | |
| # Viewer link | |
| with gr.Group(): | |
| gr.Markdown("### 7. Viewer") | |
| viewer_link = gr.Markdown( | |
| "[Open Viewer](https://1hit.no/viewer.php)", | |
| elem_id="viewer_link" | |
| ) | |
| # Store current separation task ID for auto-polling | |
| current_separation_task_id = gr.State(value="") | |
| polling_active_flag = gr.State(value=False) | |
| polling_thread = None | |
| # Step 1: Get audio files | |
| def on_get_audio(task_id): | |
| if not task_id: | |
| return "❌ Enter Task ID", gr.Dropdown(choices=[], visible=False), gr.Radio(visible=False), gr.Button(visible=False), gr.Markdown(visible=False) | |
| status, options = get_audio_files(task_id) | |
| if not options: | |
| return status, gr.Dropdown(choices=[], visible=False), gr.Radio(visible=False), gr.Button(visible=False), gr.Markdown(visible=False) | |
| return ( | |
| status, | |
| gr.Dropdown(choices=options, value=options[0][1] if options else None, visible=True), | |
| gr.Radio(visible=True), | |
| gr.Button(visible=True), | |
| gr.Markdown("Ready to separate!", visible=True) | |
| ) | |
| # Step 2-3: Submit separation task | |
| def on_submit(task_id, audio_id, sep_type, active_flag): | |
| if not task_id or not audio_id: | |
| return "❌ Missing Task ID or Audio ID", "", "⏳ Waiting...", gr.Slider(visible=False), gr.Button(visible=False), active_flag | |
| status, separation_task_id = submit_separation_task(task_id, audio_id, sep_type) | |
| if not separation_task_id: | |
| return status, "", "⏳ Waiting...", gr.Slider(visible=False), gr.Button(visible=False), active_flag | |
| # Start auto-polling | |
| return ( | |
| status, | |
| separation_task_id, | |
| "⏳ Polling started... (attempt 0/60)", | |
| gr.Slider(visible=True, value=0), | |
| gr.Button(visible=True), | |
| True # Set polling active | |
| ) | |
| # Auto-polling update function | |
| def update_auto_poll(separation_task_id, attempt, active_flag): | |
| if not active_flag or not separation_task_id: | |
| return "⏳ Polling stopped", attempt, "", False | |
| attempt += 1 | |
| # Poll for status | |
| status, results, error = poll_separation_status(separation_task_id) | |
| if error: | |
| return f"❌ Poll error: {error}", attempt, "", False | |
| if status == "SUCCESS": | |
| if results: | |
| output = f"✅ **Separation Complete!**\n\n" | |
| output += f"**Task ID:** `{separation_task_id}`\n\n" | |
| output += format_download_links(results, separation_task_id) | |
| return f"✅ Complete! (attempt {attempt})", attempt, output, False | |
| else: | |
| output = f"✅ **Processing Complete**\n\n" | |
| output += f"**Task ID:** `{separation_task_id}`\n\n" | |
| output += "Check callback results: " | |
| output += f"[Viewer](https://1hit.no/viewer.php?task_id={separation_task_id})" | |
| return f"✅ Complete! (attempt {attempt})", attempt, output, False | |
| elif status in ["PENDING", "PROCESSING", "RUNNING"]: | |
| status_text = f"⏳ **Status:** {status}\n\n" | |
| status_text += f"**Task ID:** `{separation_task_id}`\n" | |
| status_text += f"**Attempt:** {attempt}/60\n" | |
| status_text += f"**Elapsed:** {attempt * 5} seconds\n\n" | |
| status_text += "Polling every 5 seconds..." | |
| if attempt >= 60: | |
| status_text = f"⏰ **Timeout after 5 minutes**\n\n" | |
| status_text += f"**Task ID:** `{separation_task_id}`\n" | |
| status_text += "Check manually or wait for callback." | |
| return status_text, attempt, "", False | |
| return status_text, attempt, "", True | |
| elif status == "FAILED": | |
| status_text = f"❌ **Separation Failed**\n\n" | |
| status_text += f"**Task ID:** `{separation_task_id}`\n" | |
| status_text += f"**Status:** {status}" | |
| return status_text, attempt, "", False | |
| else: | |
| status_text = f"🔄 **Status:** {status}\n\n" | |
| status_text += f"**Task ID:** `{separation_task_id}`\n" | |
| status_text += f"**Attempt:** {attempt}/60" | |
| if attempt >= 60: | |
| status_text = f"⏰ **Timeout after 5 minutes**\n\n" | |
| status_text += f"**Task ID:** `{separation_task_id}`\n" | |
| status_text += "Check manually or wait for callback." | |
| return status_text, attempt, "", False | |
| return status_text, attempt, "", True | |
| # Stop polling function | |
| def stop_polling(active_flag): | |
| return "⏹️ Polling stopped", 0, "", False, gr.Button(visible=False) | |
| # Connect events | |
| get_audio_btn.click( | |
| fn=on_get_audio, | |
| inputs=[original_task_id], | |
| outputs=[audio_status, audio_dropdown, separation_type, submit_btn, submission_output] | |
| ) | |
| # Submit button starts polling | |
| submit_btn.click( | |
| fn=on_submit, | |
| inputs=[original_task_id, audio_dropdown, separation_type, polling_active_flag], | |
| outputs=[submission_output, current_separation_task_id, auto_poll_status, auto_poll_progress, stop_poll_btn, polling_active_flag] | |
| ) | |
| # Create a separate function for polling that runs independently | |
| def start_polling_thread(separation_task_id, attempt, active_flag): | |
| """Background polling thread""" | |
| while active_flag and attempt < 60: | |
| time.sleep(5) | |
| attempt += 1 | |
| status, results, error = poll_separation_status(separation_task_id) | |
| if status == "SUCCESS" or status == "FAILED" or error: | |
| # Update UI via queue or state | |
| break | |
| return attempt | |
| # Manual polling | |
| poll_btn.click( | |
| fn=manual_poll, | |
| inputs=[poll_task_id], | |
| outputs=[poll_output] | |
| ) | |
| # Stop polling button | |
| stop_poll_btn.click( | |
| fn=stop_polling, | |
| inputs=[polling_active_flag], | |
| outputs=[auto_poll_status, auto_poll_progress, download_output, polling_active_flag, stop_poll_btn] | |
| ) | |
| # Auto-fill poll field when separation task is submitted | |
| def update_poll_field(separation_task_id): | |
| if separation_task_id: | |
| return separation_task_id | |
| return "" | |
| current_separation_task_id.change( | |
| fn=update_poll_field, | |
| inputs=[current_separation_task_id], | |
| outputs=[poll_task_id] | |
| ) | |
| # Simple polling timer (manual refresh) | |
| def manual_refresh(separation_task_id, attempt, active_flag): | |
| if active_flag and separation_task_id: | |
| return update_auto_poll(separation_task_id, attempt, active_flag) | |
| return "⏳ Waiting for task...", 0, "", active_flag | |
| # Add a refresh button for manual polling updates | |
| refresh_btn = gr.Button("🔄 Refresh Status", variant="secondary", visible=False) | |
| # Show/hide refresh button based on polling state | |
| def toggle_refresh_button(active_flag): | |
| return gr.Button(visible=active_flag) | |
| polling_active_flag.change( | |
| fn=toggle_refresh_button, | |
| inputs=[polling_active_flag], | |
| outputs=[refresh_btn] | |
| ) | |
| refresh_btn.click( | |
| fn=manual_refresh, | |
| inputs=[current_separation_task_id, auto_poll_progress, polling_active_flag], | |
| outputs=[auto_poll_status, auto_poll_progress, download_output, polling_active_flag] | |
| ) | |
| # Launch the app | |
| if __name__ == "__main__": | |
| print("🚀 Starting Suno Stem Separator") | |
| print(f"🔑 SunoKey: {'✅ Set' if SUNO_KEY else '❌ Not set'}") | |
| print("🌐 Open your browser to: http://localhost:7860") | |
| # Launch with theme parameter in launch() method | |
| app.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=False, | |
| title="Suno Stem Separator", | |
| theme="soft" | |
| ) |