File size: 6,453 Bytes
5621d80
 
7155a3e
3f1841d
c3f6215
d3cdef9
c3f6215
c063c00
 
 
e1c2b50
3f1841d
 
77c8e1c
3f1841d
 
1f1b856
3f1841d
5621d80
3f1841d
 
6c0850c
3f1841d
 
 
 
1f1b856
3f1841d
 
 
 
c063c00
3f1841d
5621d80
3f1841d
 
5621d80
2eaa647
5621d80
3f1841d
 
 
 
 
 
 
 
 
 
 
6c0850c
3f1841d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c3f6215
3f1841d
 
 
 
 
 
 
 
 
 
 
 
 
6c0850c
3f1841d
 
 
 
c3f6215
3f1841d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1f1b856
3f1841d
 
 
1f1b856
3f1841d
 
 
 
 
 
 
 
1f1b856
3f1841d
 
 
 
 
 
 
 
1f1b856
3f1841d
 
 
 
 
 
 
 
 
 
 
 
6c0850c
3f1841d
2eaa647
3f1841d
 
 
 
 
 
 
 
 
c063c00
3f1841d
1f1b856
3f1841d
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
import gradio as gr
import requests
import os
import time
from datetime import datetime

# Load Suno API key
SUNO_KEY = os.environ.get("SunoKey", "")
if not SUNO_KEY:
    print("⚠️ Warning: SunoKey environment variable not set!")

# Simple task storage
tasks = {}

def generate_lyrics_simple(prompt):
    """Simple lyrics generation using correct endpoints"""
    if not SUNO_KEY:
        return "❌ Error: SunoKey not configured"
    
    if not prompt.strip():
        return "❌ Please enter a prompt"
    
    # Generate simple task ID for our tracking
    task_id = str(int(time.time()))[-6:]
    
    # Submit to Suno API
    submit_url = "https://api.sunoapi.org/api/v1/lyrics"
    headers = {
        "Authorization": f"Bearer {SUNO_KEY}",
        "Content-Type": "application/json"
    }
    
    # Use a dummy callback URL (required by API)
    payload = {
        "prompt": prompt,
        "callBackUrl": "http://dummy.callback/not-used"
    }
    
    try:
        # Step 1: Submit the task
        print(f"📤 Submitting: {prompt[:50]}...")
        response = requests.post(submit_url, json=payload, headers=headers, timeout=30)
        
        if response.status_code != 200:
            return f"❌ Submission failed: HTTP {response.status_code}"
        
        data = response.json()
        
        if data.get("code") != 200:
            return f"❌ API error: {data.get('msg', 'Unknown error')}"
        
        # Get Suno's task ID
        suno_task_id = data["data"]["taskId"]
        print(f"✅ Submitted! Suno Task ID: {suno_task_id}")
        
        # Store task
        tasks[task_id] = {
            "suno_id": suno_task_id,
            "prompt": prompt,
            "status": "submitted",
            "start_time": datetime.now(),
            "checks": 0
        }
        
        # Step 2: Wait a bit then check using /record-info endpoint
        time.sleep(2)  # Give it a moment
        
        # Step 3: Poll using the CORRECT endpoint
        return poll_for_results(task_id, suno_task_id)
    
    except Exception as e:
        return f"❌ Error: {str(e)}"

def poll_for_results(task_id, suno_task_id, max_attempts=12):
    """Poll for results using /record-info endpoint"""
    headers = {"Authorization": f"Bearer {SUNO_KEY}"}
    
    # Use the CORRECT endpoint: /record-info
    poll_url = f"https://api.sunoapi.org/api/v1/lyrics/record-info"
    
    for attempt in range(max_attempts):
        try:
            # Try different parameter names
            params_list = [
                {"taskId": suno_task_id},
                {"id": suno_task_id},
                {"record_id": suno_task_id},
            ]
            
            response_data = None
            for params in params_list:
                try:
                    response = requests.get(poll_url, headers=headers, params=params, timeout=30)
                    if response.status_code == 200:
                        response_data = response.json()
                        if response_data.get("code") == 200:
                            break
                except:
                    continue
            
            if response_data and response_data.get("code") == 200:
                task_info = response_data["data"]
                
                # Update status (might be in different field names)
                status = task_info.get("status") or task_info.get("state") or "unknown"
                tasks[task_id]["status"] = status
                tasks[task_id]["checks"] = attempt + 1
                
                # Check if completed
                if status == "completed":
                    # Try to get lyrics from different possible field names
                    lyrics = (
                        task_info.get("lyrics") or 
                        task_info.get("data") or 
                        task_info.get("result") or 
                        []
                    )
                    
                    if lyrics:
                        elapsed = int((datetime.now() - tasks[task_id]["start_time"]).total_seconds())
                        return format_success(lyrics, task_id, elapsed)
                    else:
                        return "✅ Completed but no lyrics data found"
                
                elif status == "failed":
                    error = task_info.get("error", "Unknown error")
                    return f"❌ Task failed: {error}"
                
                elif status == "processing":
                    # Still processing
                    elapsed = int((datetime.now() - tasks[task_id]["start_time"]).total_seconds())
                    if attempt < max_attempts - 1:
                        time.sleep(5)  # Wait 5 seconds before next check
                        continue
                    else:
                        return f"⏳ Still processing after {elapsed} seconds. Try checking again later."
                
                else:  # submitted or unknown
                    elapsed = int((datetime.now() - tasks[task_id]["start_time"]).total_seconds())
                    if attempt < max_attempts - 1:
                        time.sleep(5)
                        continue
                    else:
                        return f"📥 Still submitted after {elapsed} seconds. Suno API might be slow."
            
            else:
                # If we can't get status, show what we got
                if response_data:
                    return f"⚠️ Status check error: {response_data.get('msg', 'Unknown')}"
                else:
                    return "⚠️ Could not check status - no response from API"
        
        except Exception as e:
            if attempt < max_attempts - 1:
                time.sleep(5)
                continue
            else:
                return f"⚠️ Error checking status: {str(e)}"
    
    return "⏰ Timeout after multiple attempts"

def format_success(lyrics_data, task_id, elapsed_time):
    """Format successful lyrics"""
    # Handle different data structures
    if isinstance(lyrics_data, dict):
        # If it's a dict, try to extract lyrics
        lyrics_text = lyrics_data.get('text') or lyrics_data.get('lyrics') or str(lyrics_data)
        title = lyrics_data.get('title', 'Generated Lyrics')
        
        output = f"""# 🎵 {title} (Task: {task_id})

**Generated in:** {elapsed_time} seconds

```text
{lyrics_text}