File size: 8,442 Bytes
6b44ecd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
import gradio as gr
import requests
import os
import time
import zipfile
import shutil
from concurrent.futures import ThreadPoolExecutor, as_completed

# ==========================================
# ⚙️ 核心配置
# ==========================================
SUNO_API_KEY = "2335a91def6e8c7b851c286ffd80f0df"
UPLOAD_URL = "https://kieai.redpandaai.co/api/file-stream-upload"
COVER_URL = "https://api.kie.ai/api/v1/generate/upload-cover"
QUERY_URL = "https://api.kie.ai/api/v1/generate/record-info"
DUMMY_CALLBACK = "https://example.com/callback"

# 最大并发数
MAX_WORKERS = 5

def log(msg):
    return f"[{time.strftime('%H:%M:%S')}] {msg}"

# ==========================================
# 🛠️ 基础功能
# ==========================================
def upload_file(file_obj):
    try:
        file_path = file_obj.name
        file_name = os.path.basename(file_path)
        headers = {"Authorization": f"Bearer {SUNO_API_KEY}"}
        files = {'file': (file_name, open(file_path, 'rb'))}
        data = {'uploadPath': 'suno-inst-copy'}
        
        res = requests.post(UPLOAD_URL, headers=headers, files=files, data=data)
        res_json = res.json()
        
        url = res_json.get('data', {}).get('downloadUrl') or res_json.get('data', {}).get('fileUrl')
        if url: return {"status": "success", "name": file_name, "url": url}
        return {"status": "failed", "name": file_name, "msg": "No URL returned"}
    except Exception as e:
        return {"status": "error", "name": os.path.basename(file_obj.name), "msg": str(e)}

# ==========================================
# 🎹 核心:复刻任务提交 (接收相似度参数)
# ==========================================
def submit_copy_task(upload_data, similarity_val):
    if upload_data['status'] != 'success': return None
    
    # 万能高保真 Prompt
    universal_prompt = "Instrumental, High Fidelity, Cinematic, Clear Production, Masterpiece, Original Vibe"
    
    payload = {
        "uploadUrl": upload_data['url'],
        "callBackUrl": DUMMY_CALLBACK,
        "model": "V5",
        
        "instrumental": True,       # 必须:纯音乐
        "prompt": universal_prompt, # 通用高保真提示词
        "lyrics": "",               # 无歌词
        "customMode": True,
        "title": f"{upload_data['name']} (Sim_{int(similarity_val*100)}%)",
        
        # ⚡️ 核心修改点:动态相似度 ⚡️
        "styleWeight": 0.95,          # 风格权重保持高位,确保流派不偏
        "audioWeight": similarity_val # 音频权重由滑块控制
    }
    
    try:
        res = requests.post(COVER_URL, headers={"Authorization": f"Bearer {SUNO_API_KEY}", "Content-Type": "application/json"}, json=payload)
        data = res.json()
        if data.get('code') == 200:
            return {"id": data['data']['taskId'], "name": upload_data['name']}
        return {"failed": True, "name": upload_data['name'], "msg": data.get('msg')}
    except Exception as e:
        return {"failed": True, "name": upload_data['name'], "msg": str(e)}

# ==========================================
# 🔄 流程控制
# ==========================================
def run_one_click_copy(files, similarity):
    logs = [log(f"🚀 启动复刻流程 | 目标相似度: {similarity}")]
    if not files: return "\n".join(logs), None
    
    # 1. 上传
    logs.append(log(f"⬆️ 正在上传 {len(files)} 个音频..."))
    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as pool:
        uploads = list(pool.map(upload_file, files))
    
    # 2. 提交
    logs.append(log(f"⚙️ 提交任务..."))
    tasks = []
    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as pool:
        # 将 similarity 参数传递给 submit 函数
        futures = [pool.submit(submit_copy_task, up, similarity) for up in uploads]
        for f in as_completed(futures):
            res = f.result()
            if res and not res.get('failed'):
                tasks.append(res)
                logs.append(log(f"  ✅ 任务建立: {res['name']}"))
            else:
                logs.append(log(f"  ❌ 建立失败: {res.get('name')} - {res.get('msg')}"))
    
    if not tasks:
        logs.append(log("❌ 无有效任务,流程结束"))
        return "\n".join(logs), None

    # 3. 监控
    logs.append(log(f"⏳ 等待处理 (共 {len(tasks)} 个)..."))
    yield "\n".join(logs), None
    
    completed_urls = []
    headers = {"Authorization": f"Bearer {SUNO_API_KEY}"}
    start_time = time.time()
    
    while len(completed_urls) < len(tasks):
        if time.time() - start_time > 1200: break
        
        pending = False
        for task in tasks:
            if any(c['id'] == task['id'] for c in completed_urls): continue
            if task.get('failed_final'): continue
            
            try:
                r = requests.get(QUERY_URL, headers=headers, params={"taskId": task['id']}).json()
                status = r.get('data', {}).get('status')
                
                if status == 'SUCCESS':
                    data = r['data']['response'].get('sunoData', [])
                    if data:
                        info = {"id": task['id'], "url": data[0]['audioUrl'], "name": task['name']}
                        completed_urls.append(info)
                        logs.append(log(f"  🎉 完成: {task['name']}"))
                        yield "\n".join(logs), None
                elif status in ['GENERATE_AUDIO_FAILED', 'SENSITIVE_WORD_ERROR']:
                    task['failed_final'] = True
                    logs.append(log(f"  ❌ 平台生成失败: {task['name']}"))
                    yield "\n".join(logs), None
                else:
                    pending = True
            except: pending = True
            
        if pending: time.sleep(5)
        else: break
        
    # 4. 打包
    logs.append(log("📦 打包文件中..."))
    temp_dir = f"Inst_Copy_{int(time.time())}"
    os.makedirs(temp_dir, exist_ok=True)
    
    cnt = 0
    for item in completed_urls:
        try:
            with requests.get(item['url'], stream=True) as r:
                fname = f"{os.path.splitext(item['name'])[0]}_Remix.mp3"
                with open(os.path.join(temp_dir, fname), 'wb') as f:
                    f.write(r.content)
            cnt += 1
        except: pass
        
    if cnt > 0:
        zip_path = f"{temp_dir}.zip"
        with zipfile.ZipFile(zip_path, 'w') as z:
            for root, _, fs in os.walk(temp_dir):
                for f in fs: z.write(os.path.join(root, f), f)
        shutil.rmtree(temp_dir)
        logs.append(log(f"✅ 完成!成功下载 {cnt} 个文件"))
        yield "\n".join(logs), zip_path
    else:
        shutil.rmtree(temp_dir)
        yield "\n".join(logs), None

# ==========================================
# 🖥️ 界面构建
# ==========================================
with gr.Blocks(title="Suno 纯音乐复刻专家") as demo:
    gr.Markdown("## 🎹 Suno 纯音乐复刻专家 (Instrumental Copy)")
    gr.Markdown("上传音频 -> 调节相似度 -> 一键生成。")
    
    with gr.Row():
        with gr.Column(scale=1):
            file_in = gr.File(label="拖入音频文件 (支持批量)", file_count="multiple", height=200)
            
            # 🔥 新增:相似度调节滑块 🔥
            similarity_in = gr.Slider(
                minimum=0.1, 
                maximum=0.99, 
                value=0.95, 
                step=0.01, 
                label="🎚️ 复刻相似度 (Audio Weight)",
                info="⚠️ 调节说明:\n"
                     "【0.90 - 0.99】:高保真复刻,几乎和原曲一样,仅音质/混音微调。\n"
                     "【0.60 - 0.80】:保留旋律框架,但在乐器和氛围上有明显变化(二创)。\n"
                     "【0.30 - 0.50】:仅保留一点原曲的影子,大幅度重写(魔改)。"
            )
            
            btn = gr.Button("🚀 开始制作", variant="primary", size="lg")
        
        with gr.Column(scale=1):
            log_box = gr.Textbox(label="运行日志", lines=12)
            out_file = gr.File(label="下载结果 (ZIP)")
            
    # 将 similarity_in 加入输入列表
    btn.click(run_one_click_copy, inputs=[file_in, similarity_in], outputs=[log_box, out_file])

if __name__ == "__main__":
    demo.launch()