File size: 8,442 Bytes
6b44ecd |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 |
import gradio as gr
import requests
import os
import time
import zipfile
import shutil
from concurrent.futures import ThreadPoolExecutor, as_completed
# ==========================================
# ⚙️ 核心配置
# ==========================================
SUNO_API_KEY = "2335a91def6e8c7b851c286ffd80f0df"
UPLOAD_URL = "https://kieai.redpandaai.co/api/file-stream-upload"
COVER_URL = "https://api.kie.ai/api/v1/generate/upload-cover"
QUERY_URL = "https://api.kie.ai/api/v1/generate/record-info"
DUMMY_CALLBACK = "https://example.com/callback"
# 最大并发数
MAX_WORKERS = 5
def log(msg):
return f"[{time.strftime('%H:%M:%S')}] {msg}"
# ==========================================
# 🛠️ 基础功能
# ==========================================
def upload_file(file_obj):
try:
file_path = file_obj.name
file_name = os.path.basename(file_path)
headers = {"Authorization": f"Bearer {SUNO_API_KEY}"}
files = {'file': (file_name, open(file_path, 'rb'))}
data = {'uploadPath': 'suno-inst-copy'}
res = requests.post(UPLOAD_URL, headers=headers, files=files, data=data)
res_json = res.json()
url = res_json.get('data', {}).get('downloadUrl') or res_json.get('data', {}).get('fileUrl')
if url: return {"status": "success", "name": file_name, "url": url}
return {"status": "failed", "name": file_name, "msg": "No URL returned"}
except Exception as e:
return {"status": "error", "name": os.path.basename(file_obj.name), "msg": str(e)}
# ==========================================
# 🎹 核心:复刻任务提交 (接收相似度参数)
# ==========================================
def submit_copy_task(upload_data, similarity_val):
if upload_data['status'] != 'success': return None
# 万能高保真 Prompt
universal_prompt = "Instrumental, High Fidelity, Cinematic, Clear Production, Masterpiece, Original Vibe"
payload = {
"uploadUrl": upload_data['url'],
"callBackUrl": DUMMY_CALLBACK,
"model": "V5",
"instrumental": True, # 必须:纯音乐
"prompt": universal_prompt, # 通用高保真提示词
"lyrics": "", # 无歌词
"customMode": True,
"title": f"{upload_data['name']} (Sim_{int(similarity_val*100)}%)",
# ⚡️ 核心修改点:动态相似度 ⚡️
"styleWeight": 0.95, # 风格权重保持高位,确保流派不偏
"audioWeight": similarity_val # 音频权重由滑块控制
}
try:
res = requests.post(COVER_URL, headers={"Authorization": f"Bearer {SUNO_API_KEY}", "Content-Type": "application/json"}, json=payload)
data = res.json()
if data.get('code') == 200:
return {"id": data['data']['taskId'], "name": upload_data['name']}
return {"failed": True, "name": upload_data['name'], "msg": data.get('msg')}
except Exception as e:
return {"failed": True, "name": upload_data['name'], "msg": str(e)}
# ==========================================
# 🔄 流程控制
# ==========================================
def run_one_click_copy(files, similarity):
logs = [log(f"🚀 启动复刻流程 | 目标相似度: {similarity}")]
if not files: return "\n".join(logs), None
# 1. 上传
logs.append(log(f"⬆️ 正在上传 {len(files)} 个音频..."))
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as pool:
uploads = list(pool.map(upload_file, files))
# 2. 提交
logs.append(log(f"⚙️ 提交任务..."))
tasks = []
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as pool:
# 将 similarity 参数传递给 submit 函数
futures = [pool.submit(submit_copy_task, up, similarity) for up in uploads]
for f in as_completed(futures):
res = f.result()
if res and not res.get('failed'):
tasks.append(res)
logs.append(log(f" ✅ 任务建立: {res['name']}"))
else:
logs.append(log(f" ❌ 建立失败: {res.get('name')} - {res.get('msg')}"))
if not tasks:
logs.append(log("❌ 无有效任务,流程结束"))
return "\n".join(logs), None
# 3. 监控
logs.append(log(f"⏳ 等待处理 (共 {len(tasks)} 个)..."))
yield "\n".join(logs), None
completed_urls = []
headers = {"Authorization": f"Bearer {SUNO_API_KEY}"}
start_time = time.time()
while len(completed_urls) < len(tasks):
if time.time() - start_time > 1200: break
pending = False
for task in tasks:
if any(c['id'] == task['id'] for c in completed_urls): continue
if task.get('failed_final'): continue
try:
r = requests.get(QUERY_URL, headers=headers, params={"taskId": task['id']}).json()
status = r.get('data', {}).get('status')
if status == 'SUCCESS':
data = r['data']['response'].get('sunoData', [])
if data:
info = {"id": task['id'], "url": data[0]['audioUrl'], "name": task['name']}
completed_urls.append(info)
logs.append(log(f" 🎉 完成: {task['name']}"))
yield "\n".join(logs), None
elif status in ['GENERATE_AUDIO_FAILED', 'SENSITIVE_WORD_ERROR']:
task['failed_final'] = True
logs.append(log(f" ❌ 平台生成失败: {task['name']}"))
yield "\n".join(logs), None
else:
pending = True
except: pending = True
if pending: time.sleep(5)
else: break
# 4. 打包
logs.append(log("📦 打包文件中..."))
temp_dir = f"Inst_Copy_{int(time.time())}"
os.makedirs(temp_dir, exist_ok=True)
cnt = 0
for item in completed_urls:
try:
with requests.get(item['url'], stream=True) as r:
fname = f"{os.path.splitext(item['name'])[0]}_Remix.mp3"
with open(os.path.join(temp_dir, fname), 'wb') as f:
f.write(r.content)
cnt += 1
except: pass
if cnt > 0:
zip_path = f"{temp_dir}.zip"
with zipfile.ZipFile(zip_path, 'w') as z:
for root, _, fs in os.walk(temp_dir):
for f in fs: z.write(os.path.join(root, f), f)
shutil.rmtree(temp_dir)
logs.append(log(f"✅ 完成!成功下载 {cnt} 个文件"))
yield "\n".join(logs), zip_path
else:
shutil.rmtree(temp_dir)
yield "\n".join(logs), None
# ==========================================
# 🖥️ 界面构建
# ==========================================
with gr.Blocks(title="Suno 纯音乐复刻专家") as demo:
gr.Markdown("## 🎹 Suno 纯音乐复刻专家 (Instrumental Copy)")
gr.Markdown("上传音频 -> 调节相似度 -> 一键生成。")
with gr.Row():
with gr.Column(scale=1):
file_in = gr.File(label="拖入音频文件 (支持批量)", file_count="multiple", height=200)
# 🔥 新增:相似度调节滑块 🔥
similarity_in = gr.Slider(
minimum=0.1,
maximum=0.99,
value=0.95,
step=0.01,
label="🎚️ 复刻相似度 (Audio Weight)",
info="⚠️ 调节说明:\n"
"【0.90 - 0.99】:高保真复刻,几乎和原曲一样,仅音质/混音微调。\n"
"【0.60 - 0.80】:保留旋律框架,但在乐器和氛围上有明显变化(二创)。\n"
"【0.30 - 0.50】:仅保留一点原曲的影子,大幅度重写(魔改)。"
)
btn = gr.Button("🚀 开始制作", variant="primary", size="lg")
with gr.Column(scale=1):
log_box = gr.Textbox(label="运行日志", lines=12)
out_file = gr.File(label="下载结果 (ZIP)")
# 将 similarity_in 加入输入列表
btn.click(run_one_click_copy, inputs=[file_in, similarity_in], outputs=[log_box, out_file])
if __name__ == "__main__":
demo.launch() |