import gradio as gr import requests import os import time import json import threading # Suno API key SUNO_KEY = os.environ.get("SunoKey", "") if not SUNO_KEY: print("⚠️ SunoKey not set!") # Store task info for auto-fill and polling current_task_info = {} polling_active = {} def get_audio_files(task_id): """Get audio files from Suno task ID""" if not SUNO_KEY: return "❌ Error: SunoKey not configured", [] if not task_id.strip(): return "❌ Please enter a Task ID", [] try: resp = requests.get( "https://api.sunoapi.org/api/v1/generate/record-info", headers={"Authorization": f"Bearer {SUNO_KEY}"}, params={"taskId": task_id.strip()}, timeout=30 ) if resp.status_code != 200: return f"❌ Request failed: HTTP {resp.status_code}", [] data = resp.json() if data.get("code") != 200: return f"❌ API error: {data.get('msg', 'Unknown')}", [] # Check status status = data.get("data", {}).get("status", "UNKNOWN") if status != "SUCCESS": return f"⏳ Task status: {status}. Wait for generation to complete.", [] # Get audio files suno_data = data.get("data", {}).get("response", {}).get("sunoData", []) if not suno_data: return "❌ No audio files found", [] # Create dropdown options audio_options = [] for i, audio in enumerate(suno_data): audio_id = audio.get("id", f"audio_{i}") title = audio.get("title", f"Track {i+1}") duration = audio.get("duration", 0) prompt = audio.get("prompt", "")[:50] display = f"{i+1}. {title} ({duration:.1f}s)" if prompt: display += f" - {prompt}..." audio_options.append((display, audio_id)) return f"✅ Found {len(audio_options)} audio file(s)", audio_options except Exception as e: return f"❌ Error: {str(e)}", [] def submit_separation_task(task_id, audio_id, separation_type): """Submit separation task""" try: resp = requests.post( "https://api.sunoapi.org/api/v1/vocal-removal/generate", json={ "taskId": task_id, "audioId": audio_id, "type": separation_type, "callBackUrl": "https://1hit.no/callback.php" }, headers={ "Authorization": f"Bearer {SUNO_KEY}", "Content-Type": "application/json" }, timeout=30 ) if resp.status_code == 200: data = resp.json() # Get separation task ID separation_task_id = None if "taskId" in data: separation_task_id = data["taskId"] elif data.get("code") == 200 and "data" in data and "taskId" in data["data"]: separation_task_id = data["data"]["taskId"] if separation_task_id: # Store for auto-fill current_task_info["separation_task_id"] = separation_task_id current_task_info["original_task_id"] = task_id current_task_info["audio_id"] = audio_id current_task_info["separation_type"] = separation_type current_task_info["start_time"] = time.time() current_task_info["attempt"] = 0 return f"✅ Task submitted!\n\n**Separation Task ID:** `{separation_task_id}`\n\n⏳ Starting auto-polling...", separation_task_id else: return f"❌ No task ID in response:\n{json.dumps(data, indent=2)}", None else: return f"❌ HTTP Error {resp.status_code}:\n{resp.text}", None except Exception as e: return f"❌ Error: {str(e)}", None def poll_separation_status(separation_task_id): """Poll separation task status""" try: resp = requests.get( "https://api.sunoapi.org/api/v1/vocal-removal/record-info", headers={"Authorization": f"Bearer {SUNO_KEY}"}, params={"taskId": separation_task_id}, timeout=30 ) if resp.status_code == 200: data = resp.json() # Get status status = "UNKNOWN" if "status" in data: status = data["status"] elif "data" in data and "status" in data["data"]: status = data["data"]["status"] elif data.get("code") == 200: status = "SUCCESS" # Get results results = {} if "vocal_removal_info" in data: results = data["vocal_removal_info"] elif "data" in data and "vocal_removal_info" in data["data"]: results = data["data"]["vocal_removal_info"] return status, results, None else: return "ERROR", {}, f"HTTP {resp.status_code}: {resp.text}" except Exception as e: return "ERROR", {}, str(e) def format_download_links(results, separation_task_id): """Format download links from results""" if not results: return "No download links found" output = "## 🎵 Download Links\n\n" # 2-stem separation if "vocal_url" in results or "instrumental_url" in results: if results.get("vocal_url"): output += f"**🎤 Vocals:** [Download MP3]({results['vocal_url']})\n" if results.get("instrumental_url"): output += f"**🎵 Instrumental:** [Download MP3]({results['instrumental_url']})\n" # 12-stem separation stem_fields = [ ("backing_vocals_url", "🎤 Backing Vocals"), ("bass_url", "🎸 Bass"), ("brass_url", "🎺 Brass"), ("drums_url", "🥁 Drums"), ("fx_url", "🎛️ FX/Other"), ("guitar_url", "🎸 Guitar"), ("keyboard_url", "🎹 Keyboard"), ("percussion_url", "🪘 Percussion"), ("strings_url", "🎻 Strings"), ("synth_url", "🎹 Synth"), ("woodwinds_url", "🎷 Woodwinds"), ("vocal_url", "🎤 Vocals"), ("instrumental_url", "🎵 Instrumental"), ] for field, name in stem_fields: if results.get(field): output += f"**{name}:** [Download MP3]({results[field]})\n" output += f"\n**🔗 Viewer:** [Open in Viewer](https://1hit.no/viewer.php?task_id={separation_task_id})" return output # Manual polling function def manual_poll(task_id): if not task_id: return "❌ Enter Task ID" status, results, error = poll_separation_status(task_id) if error: return f"❌ Error: {error}" if status == "SUCCESS": if results: output = f"✅ **Complete!**\n\n" output += f"**Task ID:** `{task_id}`\n\n" output += format_download_links(results, task_id) else: output = f"✅ **Complete** (no direct links)\n\n" output += f"**Task ID:** `{task_id}`\n\n" output += f"Check: [Viewer](https://1hit.no/viewer.php?task_id={task_id})" elif status in ["PENDING", "PROCESSING", "RUNNING"]: output = f"⏳ **Status:** {status}\n\n" output += f"**Task ID:** `{task_id}`\n\n" output += "Still processing. Try again in 30 seconds." elif status == "FAILED": output = f"❌ **Failed**\n\n" output += f"**Task ID:** `{task_id}`\n" output += f"**Status:** {status}" else: output = f"🔄 **Status:** {status}\n\n" output += f"**Task ID:** `{task_id}`" return output # Create the app with gr.Blocks() as app: gr.Markdown("# 🎵 Suno Stem Separator") with gr.Row(): # Left column: Input and control with gr.Column(scale=1): # Step 1: Get audio files with gr.Group(): gr.Markdown("### 1. Get Audio Files") original_task_id = gr.Textbox( label="Original Task ID", placeholder="Enter Suno generation task ID", info="From your Suno history" ) get_audio_btn = gr.Button("📥 Get Audio Files", variant="secondary") audio_status = gr.Markdown("Enter Task ID above") # Step 2: Select audio file with gr.Group(): gr.Markdown("### 2. Select Audio File") audio_dropdown = gr.Dropdown( label="Select Audio", choices=[], interactive=True, visible=False ) # Step 3: Start separation with gr.Group(): gr.Markdown("### 3. Start Separation") separation_type = gr.Radio( label="Separation Type", choices=[ ("🎤 Vocals Only (1 credit)", "separate_vocal"), ("🎛️ Full Stems (5 credits)", "split_stem") ], value="separate_vocal", visible=False ) submit_btn = gr.Button("🚀 Start Separation", variant="primary", visible=False) submission_output = gr.Markdown("Select audio file first", visible=False) # Right column: Results and polling with gr.Column(scale=2): # Auto-polling section with gr.Group(): gr.Markdown("### 4. Auto-Polling Status") auto_poll_status = gr.Markdown("Waiting for task submission...") auto_poll_progress = gr.Slider( minimum=0, maximum=60, value=0, label="Polling attempts", interactive=False, visible=False ) stop_poll_btn = gr.Button("⏹️ Stop Auto-Polling", variant="stop", visible=False) # Manual polling section with gr.Group(): gr.Markdown("### 5. Manual Polling") poll_task_id = gr.Textbox( label="Separation Task ID", placeholder="Will auto-fill from current task" ) poll_btn = gr.Button("🔍 Check Status", variant="secondary") poll_output = gr.Markdown("Enter Task ID to check") # Results section with gr.Group(): gr.Markdown("### 6. Download Links") download_output = gr.Markdown("Results will appear here") # Viewer link with gr.Group(): gr.Markdown("### 7. Viewer") viewer_link = gr.Markdown( "[Open Viewer](https://1hit.no/viewer.php)", elem_id="viewer_link" ) # Store current separation task ID for auto-polling current_separation_task_id = gr.State(value="") polling_active_flag = gr.State(value=False) polling_thread = None # Step 1: Get audio files def on_get_audio(task_id): if not task_id: return "❌ Enter Task ID", gr.Dropdown(choices=[], visible=False), gr.Radio(visible=False), gr.Button(visible=False), gr.Markdown(visible=False) status, options = get_audio_files(task_id) if not options: return status, gr.Dropdown(choices=[], visible=False), gr.Radio(visible=False), gr.Button(visible=False), gr.Markdown(visible=False) return ( status, gr.Dropdown(choices=options, value=options[0][1] if options else None, visible=True), gr.Radio(visible=True), gr.Button(visible=True), gr.Markdown("Ready to separate!", visible=True) ) # Step 2-3: Submit separation task def on_submit(task_id, audio_id, sep_type, active_flag): if not task_id or not audio_id: return "❌ Missing Task ID or Audio ID", "", "⏳ Waiting...", gr.Slider(visible=False), gr.Button(visible=False), active_flag status, separation_task_id = submit_separation_task(task_id, audio_id, sep_type) if not separation_task_id: return status, "", "⏳ Waiting...", gr.Slider(visible=False), gr.Button(visible=False), active_flag # Start auto-polling return ( status, separation_task_id, "⏳ Polling started... (attempt 0/60)", gr.Slider(visible=True, value=0), gr.Button(visible=True), True # Set polling active ) # Auto-polling update function def update_auto_poll(separation_task_id, attempt, active_flag): if not active_flag or not separation_task_id: return "⏳ Polling stopped", attempt, "", False attempt += 1 # Poll for status status, results, error = poll_separation_status(separation_task_id) if error: return f"❌ Poll error: {error}", attempt, "", False if status == "SUCCESS": if results: output = f"✅ **Separation Complete!**\n\n" output += f"**Task ID:** `{separation_task_id}`\n\n" output += format_download_links(results, separation_task_id) return f"✅ Complete! (attempt {attempt})", attempt, output, False else: output = f"✅ **Processing Complete**\n\n" output += f"**Task ID:** `{separation_task_id}`\n\n" output += "Check callback results: " output += f"[Viewer](https://1hit.no/viewer.php?task_id={separation_task_id})" return f"✅ Complete! (attempt {attempt})", attempt, output, False elif status in ["PENDING", "PROCESSING", "RUNNING"]: status_text = f"⏳ **Status:** {status}\n\n" status_text += f"**Task ID:** `{separation_task_id}`\n" status_text += f"**Attempt:** {attempt}/60\n" status_text += f"**Elapsed:** {attempt * 5} seconds\n\n" status_text += "Polling every 5 seconds..." if attempt >= 60: status_text = f"⏰ **Timeout after 5 minutes**\n\n" status_text += f"**Task ID:** `{separation_task_id}`\n" status_text += "Check manually or wait for callback." return status_text, attempt, "", False return status_text, attempt, "", True elif status == "FAILED": status_text = f"❌ **Separation Failed**\n\n" status_text += f"**Task ID:** `{separation_task_id}`\n" status_text += f"**Status:** {status}" return status_text, attempt, "", False else: status_text = f"🔄 **Status:** {status}\n\n" status_text += f"**Task ID:** `{separation_task_id}`\n" status_text += f"**Attempt:** {attempt}/60" if attempt >= 60: status_text = f"⏰ **Timeout after 5 minutes**\n\n" status_text += f"**Task ID:** `{separation_task_id}`\n" status_text += "Check manually or wait for callback." return status_text, attempt, "", False return status_text, attempt, "", True # Stop polling function def stop_polling(active_flag): return "⏹️ Polling stopped", 0, "", False, gr.Button(visible=False) # Connect events get_audio_btn.click( fn=on_get_audio, inputs=[original_task_id], outputs=[audio_status, audio_dropdown, separation_type, submit_btn, submission_output] ) # Submit button starts polling submit_btn.click( fn=on_submit, inputs=[original_task_id, audio_dropdown, separation_type, polling_active_flag], outputs=[submission_output, current_separation_task_id, auto_poll_status, auto_poll_progress, stop_poll_btn, polling_active_flag] ) # Create a separate function for polling that runs independently def start_polling_thread(separation_task_id, attempt, active_flag): """Background polling thread""" while active_flag and attempt < 60: time.sleep(5) attempt += 1 status, results, error = poll_separation_status(separation_task_id) if status == "SUCCESS" or status == "FAILED" or error: # Update UI via queue or state break return attempt # Manual polling poll_btn.click( fn=manual_poll, inputs=[poll_task_id], outputs=[poll_output] ) # Stop polling button stop_poll_btn.click( fn=stop_polling, inputs=[polling_active_flag], outputs=[auto_poll_status, auto_poll_progress, download_output, polling_active_flag, stop_poll_btn] ) # Auto-fill poll field when separation task is submitted def update_poll_field(separation_task_id): if separation_task_id: return separation_task_id return "" current_separation_task_id.change( fn=update_poll_field, inputs=[current_separation_task_id], outputs=[poll_task_id] ) # Simple polling timer (manual refresh) def manual_refresh(separation_task_id, attempt, active_flag): if active_flag and separation_task_id: return update_auto_poll(separation_task_id, attempt, active_flag) return "⏳ Waiting for task...", 0, "", active_flag # Add a refresh button for manual polling updates refresh_btn = gr.Button("🔄 Refresh Status", variant="secondary", visible=False) # Show/hide refresh button based on polling state def toggle_refresh_button(active_flag): return gr.Button(visible=active_flag) polling_active_flag.change( fn=toggle_refresh_button, inputs=[polling_active_flag], outputs=[refresh_btn] ) refresh_btn.click( fn=manual_refresh, inputs=[current_separation_task_id, auto_poll_progress, polling_active_flag], outputs=[auto_poll_status, auto_poll_progress, download_output, polling_active_flag] ) # Launch the app if __name__ == "__main__": print("🚀 Starting Suno Stem Separator") print(f"🔑 SunoKey: {'✅ Set' if SUNO_KEY else '❌ Not set'}") print("🌐 Open your browser to: http://localhost:7860") # Launch with theme parameter in launch() method app.launch( server_name="0.0.0.0", server_port=7860, share=False, title="Suno Stem Separator", theme="soft" )