Spaces:
Running
Running
| import gradio as gr | |
| import requests | |
| import os | |
| import json | |
| import uuid | |
| import time | |
| import threading | |
| from datetime import datetime | |
| from typing import Dict, Optional | |
| # Load Suno API key | |
| SUNO_KEY = os.environ.get("SunoKey", "") | |
| if not SUNO_KEY: | |
| print("⚠️ Warning: SunoKey environment variable not set!") | |
| # Task storage with auto-polling | |
| tasks_db = {} | |
| polling_threads = {} | |
| def poll_task_status(task_id: str, max_attempts=60, interval=2): | |
| """Background thread to automatically poll task status""" | |
| if task_id not in tasks_db: | |
| return | |
| task = tasks_db[task_id] | |
| api_task_id = task["api_task_id"] | |
| for attempt in range(max_attempts): | |
| try: | |
| # Poll the API | |
| url = f"https://api.sunoapi.org/api/v1/lyrics/details?taskId={api_task_id}" | |
| headers = {"Authorization": f"Bearer {SUNO_KEY}"} | |
| response = requests.get(url, headers=headers, timeout=30) | |
| data = response.json() | |
| if response.status_code == 200 and data.get("code") == 200: | |
| task_data = data["data"] | |
| status = task_data.get("status", "unknown") | |
| # Update task status | |
| tasks_db[task_id]["status"] = status | |
| tasks_db[task_id]["last_checked"] = datetime.now().isoformat() | |
| if status == "completed" and "data" in task_data: | |
| # Task completed successfully | |
| lyrics_data = task_data["data"] | |
| tasks_db[task_id]["result"] = lyrics_data | |
| tasks_db[task_id]["completed_at"] = datetime.now().isoformat() | |
| print(f"✅ Task {task_id} completed via auto-polling") | |
| break | |
| elif status == "failed": | |
| error_msg = task_data.get("error", "Unknown error") | |
| tasks_db[task_id]["error"] = error_msg | |
| tasks_db[task_id]["completed_at"] = datetime.now().isoformat() | |
| print(f"❌ Task {task_id} failed: {error_msg}") | |
| break | |
| # Update attempts | |
| tasks_db[task_id]["poll_attempts"] = attempt + 1 | |
| except Exception as e: | |
| print(f"⚠️ Polling error for task {task_id}: {str(e)}") | |
| # Wait before next poll | |
| time.sleep(interval) | |
| # Clean up polling thread | |
| if task_id in polling_threads: | |
| del polling_threads[task_id] | |
| def generate_lyrics(prompt: str, auto_poll: bool = True) -> str: | |
| """Submit lyrics generation task to Suno API""" | |
| if not SUNO_KEY: | |
| return "❌ Error: SunoKey environment variable not set. Please add it in Space Settings." | |
| if not prompt or not prompt.strip(): | |
| return "❌ Please enter a lyrics prompt" | |
| # Generate task ID | |
| task_id = str(uuid.uuid4())[:8] | |
| # Prepare API request | |
| url = "https://api.sunoapi.org/api/v1/lyrics" | |
| headers = { | |
| "Authorization": f"Bearer {SUNO_KEY}", | |
| "Content-Type": "application/json" | |
| } | |
| # Suno requires a callback URL, but we'll use a dummy one | |
| dummy_callback = "https://dummy.callback.url/not-used" | |
| payload = { | |
| "prompt": prompt, | |
| "callBackUrl": dummy_callback | |
| } | |
| try: | |
| # Submit task | |
| response = requests.post(url, headers=headers, json=payload, timeout=30) | |
| data = response.json() | |
| if response.status_code == 200 and data.get("code") == 200: | |
| api_task_id = data["data"]["taskId"] | |
| # Store task information | |
| tasks_db[task_id] = { | |
| "id": task_id, | |
| "api_task_id": api_task_id, | |
| "prompt": prompt, | |
| "status": "submitted", | |
| "result": None, | |
| "error": None, | |
| "created_at": datetime.now().isoformat(), | |
| "last_checked": None, | |
| "poll_attempts": 0, | |
| "completed_at": None, | |
| "auto_poll": auto_poll | |
| } | |
| # Start auto-polling if enabled | |
| if auto_poll: | |
| poll_thread = threading.Thread( | |
| target=poll_task_status, | |
| args=(task_id,), | |
| daemon=True | |
| ) | |
| polling_threads[task_id] = poll_thread | |
| poll_thread.start() | |
| print(f"🚀 Started auto-polling for task {task_id}") | |
| return f"""✅ **Task Submitted Successfully!** | |
| **Your Task ID:** `{task_id}` | |
| **API Task ID:** `{api_task_id}` | |
| 📝 **Prompt:** {prompt[:100]}{'...' if len(prompt) > 100 else ''} | |
| {'🔄 **Auto-polling enabled** - Results will appear automatically!' if auto_poll else '📊 **Auto-polling disabled** - Check status manually'} | |
| ⏳ **Estimated time:** 10-30 seconds | |
| 💡 **Tip:** Save your Task ID: `{task_id}`""" | |
| else: | |
| error_msg = data.get("msg", f"HTTP {response.status_code}") | |
| return f"""❌ **Submission Failed** | |
| **Error:** {error_msg} | |
| 💡 **Possible solutions:** | |
| • Check if your SunoKey is valid | |
| • Try a different prompt | |
| • Wait a few minutes and retry""" | |
| except requests.exceptions.Timeout: | |
| return "❌ Error: Request timeout - Suno API is not responding" | |
| except requests.exceptions.ConnectionError: | |
| return "❌ Error: Connection failed - Check your internet connection" | |
| except Exception as e: | |
| return f"❌ Error: {str(e)}" | |
| def check_task_status(task_id: str, auto_refresh: bool = False) -> str: | |
| """Check the status of a task""" | |
| if not task_id or not task_id.strip(): | |
| return "❌ Please enter a Task ID" | |
| if task_id not in tasks_db: | |
| return f"❌ Task ID `{task_id}` not found. Please submit a task first." | |
| task = tasks_db[task_id] | |
| # If auto_refresh is True, force an immediate API check | |
| if auto_refresh: | |
| try: | |
| url = f"https://api.sunoapi.org/api/v1/lyrics/details?taskId={task['api_task_id']}" | |
| headers = {"Authorization": f"Bearer {SUNO_KEY}"} | |
| response = requests.get(url, headers=headers, timeout=30) | |
| data = response.json() | |
| if response.status_code == 200 and data.get("code") == 200: | |
| task_data = data["data"] | |
| status = task_data.get("status", "unknown") | |
| tasks_db[task_id]["status"] = status | |
| tasks_db[task_id]["last_checked"] = datetime.now().isoformat() | |
| if status == "completed" and "data" in task_data: | |
| lyrics_data = task_data["data"] | |
| tasks_db[task_id]["result"] = lyrics_data | |
| tasks_db[task_id]["completed_at"] = datetime.now().isoformat() | |
| except Exception: | |
| pass # Keep existing status if API check fails | |
| status = task.get("status", "unknown") | |
| created_time = datetime.fromisoformat(task["created_at"]) | |
| elapsed = int((datetime.now() - created_time).total_seconds()) | |
| if status == "completed" and task.get("result"): | |
| # Format completed lyrics | |
| return format_lyrics_output(task["result"], task_id, elapsed) | |
| elif status == "failed": | |
| error_msg = task.get("error", "Unknown error") | |
| return f"""❌ **Task Failed** | |
| **Task ID:** `{task_id}` | |
| **Error:** {error_msg} | |
| **Elapsed time:** {elapsed} seconds | |
| 💡 Please try generating again with a different prompt.""" | |
| else: | |
| # Still processing | |
| attempts = task.get("poll_attempts", 0) | |
| last_checked = task.get("last_checked") | |
| last_checked_str = "" | |
| if last_checked: | |
| last_time = datetime.fromisoformat(last_checked) | |
| last_checked_str = f"\n**Last checked:** {last_time.strftime('%H:%M:%S')}" | |
| return f"""⏳ **Task Processing...** | |
| **Task ID:** `{task_id}` | |
| **Status:** {status} | |
| **Elapsed:** {elapsed} seconds | |
| **Poll attempts:** {attempts}{last_checked_str} | |
| ⏰ **Estimated completion:** 10-30 seconds | |
| 🔄 {'Auto-polling active' if task.get('auto_poll') else 'Auto-polling disabled'} | |
| {'✅ Results will appear automatically when ready!' if task.get('auto_poll') else '📊 Click "Check Status" again to refresh'}""" | |
| def format_lyrics_output(lyrics_data, task_id, elapsed_time): | |
| """Format the lyrics for display""" | |
| if not lyrics_data: | |
| return "✅ Task completed but no lyrics data received" | |
| output_lines = [ | |
| f"# 🎵 Generated Lyrics (Task: {task_id})", | |
| "", | |
| f"**Generated in:** {elapsed_time} seconds", | |
| f"**Completed at:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", | |
| "" | |
| ] | |
| for i, item in enumerate(lyrics_data, 1): | |
| title = item.get('title', f'Variant {i}') | |
| lyrics = item.get('text', 'No lyrics generated') | |
| output_lines.append(f"## Variant {i}: {title}") | |
| output_lines.append("```") | |
| output_lines.append(lyrics) | |
| output_lines.append("```") | |
| output_lines.append("---") | |
| output_lines.append("") | |
| output_lines.append("### 🎉 All done!") | |
| output_lines.append("You can generate more lyrics or try different prompts.") | |
| return "\n".join(output_lines) | |
| def list_all_tasks(): | |
| """List all submitted tasks""" | |
| if not tasks_db: | |
| return "📭 No tasks found. Generate some lyrics first!" | |
| output_lines = ["# 📋 All Submitted Tasks", ""] | |
| for task_id, task in sorted(tasks_db.items(), key=lambda x: x[1]["created_at"], reverse=True): | |
| status = task.get("status", "unknown") | |
| prompt_preview = task.get("prompt", "")[:50] | |
| created = task.get("created_at", "")[:19] | |
| # Status icon and color | |
| if status == "completed": | |
| icon = "✅" | |
| color = "green" | |
| elif status in ["failed", "error"]: | |
| icon = "❌" | |
| color = "red" | |
| else: | |
| icon = "⏳" | |
| color = "orange" | |
| output_lines.append(f"<span style='color:{color}'>{icon} **{task_id}** - {status}</span>") | |
| output_lines.append(f" Prompt: {prompt_preview}...") | |
| output_lines.append(f" Created: {created}") | |
| if task.get("completed_at"): | |
| completed = task["completed_at"][:19] | |
| output_lines.append(f" Completed: {completed}") | |
| if task.get("auto_poll"): | |
| output_lines.append(f" Auto-polling: ✅ Enabled") | |
| output_lines.append("") | |
| # Add summary | |
| completed = sum(1 for t in tasks_db.values() if t.get("status") == "completed") | |
| processing = sum(1 for t in tasks_db.values() if t.get("status") not in ["completed", "failed", "error"]) | |
| total = len(tasks_db) | |
| output_lines.append(f"**Summary:** {completed} completed, {processing} processing, {total} total") | |
| return "\n".join(output_lines) | |
| def auto_refresh_display(task_id: str): | |
| """Auto-refresh the status display""" | |
| if not task_id or task_id not in tasks_db: | |
| return task_id, "❌ Invalid Task ID" | |
| result = check_task_status(task_id, auto_refresh=True) | |
| return task_id, result | |
| # Create the Gradio interface | |
| with gr.Blocks(title="Suno Lyrics Generator", theme=gr.themes.Soft()) as app: | |
| gr.Markdown("# 🎵 Suno AI Lyrics Generator") | |
| gr.Markdown("Generate song lyrics using Suno's AI API") | |
| # Store the current task ID in a hidden state | |
| current_task_id = gr.State("") | |
| with gr.Tabs(): | |
| # Tab 1: Generate Lyrics | |
| with gr.TabItem("✨ Generate"): | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| gr.Markdown("### Enter your lyrics idea") | |
| prompt_input = gr.Textbox( | |
| label="Lyrics Prompt", | |
| placeholder="Example: A romantic ballad about stargazing on a summer night...", | |
| lines=4 | |
| ) | |
| auto_poll_checkbox = gr.Checkbox( | |
| label="🔁 Enable auto-polling", | |
| value=True, | |
| info="Automatically check for results (recommended)" | |
| ) | |
| submit_btn = gr.Button("🚀 Generate Lyrics", variant="primary", size="lg") | |
| gr.Markdown("### 📝 Tips:") | |
| gr.Markdown(""" | |
| • Be descriptive in your prompt | |
| • Include genre, mood, or theme | |
| • Typical processing time: 10-30 seconds | |
| • Auto-polling will show results automatically | |
| """) | |
| with gr.Column(scale=3): | |
| output_area = gr.Markdown( | |
| label="Result", | |
| value="Your task submission result will appear here..." | |
| ) | |
| def handle_generation(prompt, auto_poll): | |
| result = generate_lyrics(prompt, auto_poll) | |
| # Extract task ID from result | |
| task_id = "" | |
| if "Task ID:" in result: | |
| for line in result.split('\n'): | |
| if 'Task ID:' in line: | |
| parts = line.split('`') | |
| if len(parts) > 1: | |
| task_id = parts[1] | |
| break | |
| return result, task_id | |
| submit_btn.click( | |
| fn=handle_generation, | |
| inputs=[prompt_input, auto_poll_checkbox], | |
| outputs=[output_area, current_task_id] | |
| ) | |
| # Tab 2: Check Status | |
| with gr.TabItem("🔍 Check Status"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("### Enter your Task ID") | |
| task_id_input = gr.Textbox( | |
| label="Task ID", | |
| placeholder="Paste your Task ID here (e.g., 7d3b63c4)", | |
| scale=1 | |
| ) | |
| # Auto-populate if we have a current task ID | |
| def update_task_id_input(current_id): | |
| return current_id if current_id else "" | |
| current_task_id.change( | |
| fn=update_task_id_input, | |
| inputs=current_task_id, | |
| outputs=task_id_input | |
| ) | |
| with gr.Row(): | |
| check_btn = gr.Button("🔍 Check Status", variant="primary") | |
| auto_refresh_btn = gr.Button("🔄 Auto-refresh", variant="secondary") | |
| stop_refresh_btn = gr.Button("⏹️ Stop", variant="stop", visible=False) | |
| gr.Markdown("---") | |
| refresh_all_btn = gr.Button("📋 List All Tasks") | |
| tasks_list = gr.Markdown(label="All Tasks") | |
| with gr.Column(): | |
| status_output = gr.Markdown( | |
| label="Status", | |
| value="Enter a Task ID above and click Check Status" | |
| ) | |
| # Regular check | |
| check_btn.click( | |
| fn=check_task_status, | |
| inputs=task_id_input, | |
| outputs=status_output | |
| ) | |
| # Auto-refresh function | |
| def start_auto_refresh(task_id, stop_flag): | |
| if not task_id: | |
| return task_id, "❌ Please enter a Task ID first", gr.update(visible=False), gr.update(visible=True) | |
| # Update button states | |
| return task_id, "🔄 Starting auto-refresh...", gr.update(visible=True), gr.update(visible=False) | |
| def do_auto_refresh(task_id, iterations=10): | |
| results = [] | |
| for i in range(iterations): | |
| if not task_id or task_id not in tasks_db: | |
| break | |
| result = check_task_status(task_id, auto_refresh=True) | |
| results.append(result) | |
| # If completed, stop early | |
| task = tasks_db.get(task_id, {}) | |
| if task.get("status") in ["completed", "failed"]: | |
| break | |
| # Wait between refreshes | |
| if i < iterations - 1: | |
| time.sleep(3) | |
| return task_id, results[-1] if results else "Auto-refresh stopped" | |
| # Connect auto-refresh button | |
| auto_refresh_click = auto_refresh_btn.click( | |
| fn=start_auto_refresh, | |
| inputs=[task_id_input, gr.State(False)], | |
| outputs=[task_id_input, status_output, stop_refresh_btn, auto_refresh_btn] | |
| ).then( | |
| fn=lambda: time.sleep(1), | |
| inputs=None, | |
| outputs=None | |
| ).then( | |
| fn=do_auto_refresh, | |
| inputs=task_id_input, | |
| outputs=[task_id_input, status_output] | |
| ) | |
| # Stop button | |
| stop_refresh_btn.click( | |
| fn=lambda: ("", "⏹️ Auto-refresh stopped", gr.update(visible=False), gr.update(visible=True)), | |
| inputs=None, | |
| outputs=[task_id_input, status_output, stop_refresh_btn, auto_refresh_btn] | |
| ) | |
| # List all tasks | |
| refresh_all_btn.click( | |
| fn=list_all_tasks, | |
| inputs=None, | |
| outputs=tasks_list | |
| ) | |
| # Tab 3: Help & Info | |
| with gr.TabItem("ℹ️ Help"): | |
| gr.Markdown("# Help & Information") | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("### 🔑 API Status") | |
| api_status = "✅ Configured" if SUNO_KEY else "❌ Not configured" | |
| gr.Markdown(f"**SunoKey:** {api_status}") | |
| gr.Markdown("### 🚀 How to Use:") | |
| gr.Markdown(""" | |
| 1. **Generate Tab:** | |
| - Enter lyrics prompt | |
| - Enable auto-polling (recommended) | |
| - Click Generate | |
| - Save your Task ID | |
| 2. **Check Status Tab:** | |
| - Paste your Task ID | |
| - Click "Check Status" for manual check | |
| - Click "Auto-refresh" for automatic updates | |
| - Results appear automatically with auto-polling | |
| 3. **Results:** | |
| - Appear automatically if auto-polling enabled | |
| - Can take 10-30 seconds | |
| - Multiple lyric variants provided | |
| """) | |
| with gr.Column(): | |
| gr.Markdown("### 🔧 Features") | |
| gr.Markdown(""" | |
| **Auto-polling:** | |
| - Background thread checks status every 2 seconds | |
| - Results appear automatically | |
| - Stops after completion or 2 minutes | |
| **Manual checking:** | |
| - Check specific Task ID status | |
| - Auto-refresh button for continuous updates | |
| - List all submitted tasks | |
| **Task management:** | |
| - All tasks stored in memory | |
| - Status tracking | |
| - Error handling | |
| """) | |
| gr.Markdown("### ⚠️ Troubleshooting") | |
| gr.Markdown(""" | |
| **If tasks aren't completing:** | |
| 1. Check your SunoKey has available credits | |
| 2. Try simpler prompts | |
| 3. Wait 30+ seconds for processing | |
| 4. Use the "Auto-refresh" button | |
| **Common errors:** | |
| - "Task ID not found": Submit a new task | |
| - "SunoKey not set": Add API key in Space Settings | |
| - "Timeout": Suno API may be busy | |
| """) | |
| # Launch the app | |
| if __name__ == "__main__": | |
| print(f"🔑 SunoKey status: {'Configured' if SUNO_KEY else 'NOT SET'}") | |
| print("🚀 Starting Suno Lyrics Generator with auto-polling...") | |
| app.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=False, | |
| debug=False, | |
| show_error=True | |
| ) |