import gradio as gr import requests import os import time import zipfile import shutil from concurrent.futures import ThreadPoolExecutor, as_completed # ========================================== # ⚙️ 核心配置 # ========================================== SUNO_API_KEY = "2335a91def6e8c7b851c286ffd80f0df" UPLOAD_URL = "https://kieai.redpandaai.co/api/file-stream-upload" COVER_URL = "https://api.kie.ai/api/v1/generate/upload-cover" QUERY_URL = "https://api.kie.ai/api/v1/generate/record-info" DUMMY_CALLBACK = "https://example.com/callback" # 最大并发数 MAX_WORKERS = 5 def log(msg): return f"[{time.strftime('%H:%M:%S')}] {msg}" # ========================================== # 🛠️ 基础功能 # ========================================== def upload_file(file_obj): try: file_path = file_obj.name file_name = os.path.basename(file_path) headers = {"Authorization": f"Bearer {SUNO_API_KEY}"} files = {'file': (file_name, open(file_path, 'rb'))} data = {'uploadPath': 'suno-inst-copy'} res = requests.post(UPLOAD_URL, headers=headers, files=files, data=data) res_json = res.json() url = res_json.get('data', {}).get('downloadUrl') or res_json.get('data', {}).get('fileUrl') if url: return {"status": "success", "name": file_name, "url": url} return {"status": "failed", "name": file_name, "msg": "No URL returned"} except Exception as e: return {"status": "error", "name": os.path.basename(file_obj.name), "msg": str(e)} # ========================================== # 🎹 核心:复刻任务提交 (接收相似度参数) # ========================================== def submit_copy_task(upload_data, similarity_val): if upload_data['status'] != 'success': return None # 万能高保真 Prompt universal_prompt = "Instrumental, High Fidelity, Cinematic, Clear Production, Masterpiece, Original Vibe" payload = { "uploadUrl": upload_data['url'], "callBackUrl": DUMMY_CALLBACK, "model": "V5", "instrumental": True, # 必须:纯音乐 "prompt": universal_prompt, # 通用高保真提示词 "lyrics": "", # 无歌词 "customMode": True, "title": f"{upload_data['name']} (Sim_{int(similarity_val*100)}%)", # ⚡️ 核心修改点:动态相似度 ⚡️ "styleWeight": 0.95, # 风格权重保持高位,确保流派不偏 "audioWeight": similarity_val # 音频权重由滑块控制 } try: res = requests.post(COVER_URL, headers={"Authorization": f"Bearer {SUNO_API_KEY}", "Content-Type": "application/json"}, json=payload) data = res.json() if data.get('code') == 200: return {"id": data['data']['taskId'], "name": upload_data['name']} return {"failed": True, "name": upload_data['name'], "msg": data.get('msg')} except Exception as e: return {"failed": True, "name": upload_data['name'], "msg": str(e)} # ========================================== # 🔄 流程控制 # ========================================== def run_one_click_copy(files, similarity): logs = [log(f"🚀 启动复刻流程 | 目标相似度: {similarity}")] if not files: return "\n".join(logs), None # 1. 上传 logs.append(log(f"⬆️ 正在上传 {len(files)} 个音频...")) with ThreadPoolExecutor(max_workers=MAX_WORKERS) as pool: uploads = list(pool.map(upload_file, files)) # 2. 提交 logs.append(log(f"⚙️ 提交任务...")) tasks = [] with ThreadPoolExecutor(max_workers=MAX_WORKERS) as pool: # 将 similarity 参数传递给 submit 函数 futures = [pool.submit(submit_copy_task, up, similarity) for up in uploads] for f in as_completed(futures): res = f.result() if res and not res.get('failed'): tasks.append(res) logs.append(log(f" ✅ 任务建立: {res['name']}")) else: logs.append(log(f" ❌ 建立失败: {res.get('name')} - {res.get('msg')}")) if not tasks: logs.append(log("❌ 无有效任务,流程结束")) return "\n".join(logs), None # 3. 监控 logs.append(log(f"⏳ 等待处理 (共 {len(tasks)} 个)...")) yield "\n".join(logs), None completed_urls = [] headers = {"Authorization": f"Bearer {SUNO_API_KEY}"} start_time = time.time() while len(completed_urls) < len(tasks): if time.time() - start_time > 1200: break pending = False for task in tasks: if any(c['id'] == task['id'] for c in completed_urls): continue if task.get('failed_final'): continue try: r = requests.get(QUERY_URL, headers=headers, params={"taskId": task['id']}).json() status = r.get('data', {}).get('status') if status == 'SUCCESS': data = r['data']['response'].get('sunoData', []) if data: info = {"id": task['id'], "url": data[0]['audioUrl'], "name": task['name']} completed_urls.append(info) logs.append(log(f" 🎉 完成: {task['name']}")) yield "\n".join(logs), None elif status in ['GENERATE_AUDIO_FAILED', 'SENSITIVE_WORD_ERROR']: task['failed_final'] = True logs.append(log(f" ❌ 平台生成失败: {task['name']}")) yield "\n".join(logs), None else: pending = True except: pending = True if pending: time.sleep(5) else: break # 4. 打包 logs.append(log("📦 打包文件中...")) temp_dir = f"Inst_Copy_{int(time.time())}" os.makedirs(temp_dir, exist_ok=True) cnt = 0 for item in completed_urls: try: with requests.get(item['url'], stream=True) as r: fname = f"{os.path.splitext(item['name'])[0]}_Remix.mp3" with open(os.path.join(temp_dir, fname), 'wb') as f: f.write(r.content) cnt += 1 except: pass if cnt > 0: zip_path = f"{temp_dir}.zip" with zipfile.ZipFile(zip_path, 'w') as z: for root, _, fs in os.walk(temp_dir): for f in fs: z.write(os.path.join(root, f), f) shutil.rmtree(temp_dir) logs.append(log(f"✅ 完成!成功下载 {cnt} 个文件")) yield "\n".join(logs), zip_path else: shutil.rmtree(temp_dir) yield "\n".join(logs), None # ========================================== # 🖥️ 界面构建 # ========================================== with gr.Blocks(title="Suno 纯音乐复刻专家") as demo: gr.Markdown("## 🎹 Suno 纯音乐复刻专家 (Instrumental Copy)") gr.Markdown("上传音频 -> 调节相似度 -> 一键生成。") with gr.Row(): with gr.Column(scale=1): file_in = gr.File(label="拖入音频文件 (支持批量)", file_count="multiple", height=200) # 🔥 新增:相似度调节滑块 🔥 similarity_in = gr.Slider( minimum=0.1, maximum=0.99, value=0.95, step=0.01, label="🎚️ 复刻相似度 (Audio Weight)", info="⚠️ 调节说明:\n" "【0.90 - 0.99】:高保真复刻,几乎和原曲一样,仅音质/混音微调。\n" "【0.60 - 0.80】:保留旋律框架,但在乐器和氛围上有明显变化(二创)。\n" "【0.30 - 0.50】:仅保留一点原曲的影子,大幅度重写(魔改)。" ) btn = gr.Button("🚀 开始制作", variant="primary", size="lg") with gr.Column(scale=1): log_box = gr.Textbox(label="运行日志", lines=12) out_file = gr.File(label="下载结果 (ZIP)") # 将 similarity_in 加入输入列表 btn.click(run_one_click_copy, inputs=[file_in, similarity_in], outputs=[log_box, out_file]) if __name__ == "__main__": demo.launch()