|
|
import gradio as gr |
|
|
import requests |
|
|
import os |
|
|
import time |
|
|
import zipfile |
|
|
import shutil |
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
SUNO_API_KEY = "2335a91def6e8c7b851c286ffd80f0df" |
|
|
UPLOAD_URL = "https://kieai.redpandaai.co/api/file-stream-upload" |
|
|
COVER_URL = "https://api.kie.ai/api/v1/generate/upload-cover" |
|
|
QUERY_URL = "https://api.kie.ai/api/v1/generate/record-info" |
|
|
DUMMY_CALLBACK = "https://example.com/callback" |
|
|
|
|
|
|
|
|
MAX_WORKERS = 5 |
|
|
|
|
|
def log(msg): |
|
|
return f"[{time.strftime('%H:%M:%S')}] {msg}" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def upload_file(file_obj): |
|
|
try: |
|
|
file_path = file_obj.name |
|
|
file_name = os.path.basename(file_path) |
|
|
headers = {"Authorization": f"Bearer {SUNO_API_KEY}"} |
|
|
files = {'file': (file_name, open(file_path, 'rb'))} |
|
|
data = {'uploadPath': 'suno-inst-copy'} |
|
|
|
|
|
res = requests.post(UPLOAD_URL, headers=headers, files=files, data=data) |
|
|
res_json = res.json() |
|
|
|
|
|
url = res_json.get('data', {}).get('downloadUrl') or res_json.get('data', {}).get('fileUrl') |
|
|
if url: return {"status": "success", "name": file_name, "url": url} |
|
|
return {"status": "failed", "name": file_name, "msg": "No URL returned"} |
|
|
except Exception as e: |
|
|
return {"status": "error", "name": os.path.basename(file_obj.name), "msg": str(e)} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def submit_copy_task(upload_data, similarity_val): |
|
|
if upload_data['status'] != 'success': return None |
|
|
|
|
|
|
|
|
universal_prompt = "Instrumental, High Fidelity, Cinematic, Clear Production, Masterpiece, Original Vibe" |
|
|
|
|
|
payload = { |
|
|
"uploadUrl": upload_data['url'], |
|
|
"callBackUrl": DUMMY_CALLBACK, |
|
|
"model": "V5", |
|
|
|
|
|
"instrumental": True, |
|
|
"prompt": universal_prompt, |
|
|
"lyrics": "", |
|
|
"customMode": True, |
|
|
"title": f"{upload_data['name']} (Sim_{int(similarity_val*100)}%)", |
|
|
|
|
|
|
|
|
"styleWeight": 0.95, |
|
|
"audioWeight": similarity_val |
|
|
} |
|
|
|
|
|
try: |
|
|
res = requests.post(COVER_URL, headers={"Authorization": f"Bearer {SUNO_API_KEY}", "Content-Type": "application/json"}, json=payload) |
|
|
data = res.json() |
|
|
if data.get('code') == 200: |
|
|
return {"id": data['data']['taskId'], "name": upload_data['name']} |
|
|
return {"failed": True, "name": upload_data['name'], "msg": data.get('msg')} |
|
|
except Exception as e: |
|
|
return {"failed": True, "name": upload_data['name'], "msg": str(e)} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def run_one_click_copy(files, similarity): |
|
|
logs = [log(f"🚀 启动复刻流程 | 目标相似度: {similarity}")] |
|
|
if not files: return "\n".join(logs), None |
|
|
|
|
|
|
|
|
logs.append(log(f"⬆️ 正在上传 {len(files)} 个音频...")) |
|
|
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as pool: |
|
|
uploads = list(pool.map(upload_file, files)) |
|
|
|
|
|
|
|
|
logs.append(log(f"⚙️ 提交任务...")) |
|
|
tasks = [] |
|
|
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as pool: |
|
|
|
|
|
futures = [pool.submit(submit_copy_task, up, similarity) for up in uploads] |
|
|
for f in as_completed(futures): |
|
|
res = f.result() |
|
|
if res and not res.get('failed'): |
|
|
tasks.append(res) |
|
|
logs.append(log(f" ✅ 任务建立: {res['name']}")) |
|
|
else: |
|
|
logs.append(log(f" ❌ 建立失败: {res.get('name')} - {res.get('msg')}")) |
|
|
|
|
|
if not tasks: |
|
|
logs.append(log("❌ 无有效任务,流程结束")) |
|
|
return "\n".join(logs), None |
|
|
|
|
|
|
|
|
logs.append(log(f"⏳ 等待处理 (共 {len(tasks)} 个)...")) |
|
|
yield "\n".join(logs), None |
|
|
|
|
|
completed_urls = [] |
|
|
headers = {"Authorization": f"Bearer {SUNO_API_KEY}"} |
|
|
start_time = time.time() |
|
|
|
|
|
while len(completed_urls) < len(tasks): |
|
|
if time.time() - start_time > 1200: break |
|
|
|
|
|
pending = False |
|
|
for task in tasks: |
|
|
if any(c['id'] == task['id'] for c in completed_urls): continue |
|
|
if task.get('failed_final'): continue |
|
|
|
|
|
try: |
|
|
r = requests.get(QUERY_URL, headers=headers, params={"taskId": task['id']}).json() |
|
|
status = r.get('data', {}).get('status') |
|
|
|
|
|
if status == 'SUCCESS': |
|
|
data = r['data']['response'].get('sunoData', []) |
|
|
if data: |
|
|
info = {"id": task['id'], "url": data[0]['audioUrl'], "name": task['name']} |
|
|
completed_urls.append(info) |
|
|
logs.append(log(f" 🎉 完成: {task['name']}")) |
|
|
yield "\n".join(logs), None |
|
|
elif status in ['GENERATE_AUDIO_FAILED', 'SENSITIVE_WORD_ERROR']: |
|
|
task['failed_final'] = True |
|
|
logs.append(log(f" ❌ 平台生成失败: {task['name']}")) |
|
|
yield "\n".join(logs), None |
|
|
else: |
|
|
pending = True |
|
|
except: pending = True |
|
|
|
|
|
if pending: time.sleep(5) |
|
|
else: break |
|
|
|
|
|
|
|
|
logs.append(log("📦 打包文件中...")) |
|
|
temp_dir = f"Inst_Copy_{int(time.time())}" |
|
|
os.makedirs(temp_dir, exist_ok=True) |
|
|
|
|
|
cnt = 0 |
|
|
for item in completed_urls: |
|
|
try: |
|
|
with requests.get(item['url'], stream=True) as r: |
|
|
fname = f"{os.path.splitext(item['name'])[0]}_Remix.mp3" |
|
|
with open(os.path.join(temp_dir, fname), 'wb') as f: |
|
|
f.write(r.content) |
|
|
cnt += 1 |
|
|
except: pass |
|
|
|
|
|
if cnt > 0: |
|
|
zip_path = f"{temp_dir}.zip" |
|
|
with zipfile.ZipFile(zip_path, 'w') as z: |
|
|
for root, _, fs in os.walk(temp_dir): |
|
|
for f in fs: z.write(os.path.join(root, f), f) |
|
|
shutil.rmtree(temp_dir) |
|
|
logs.append(log(f"✅ 完成!成功下载 {cnt} 个文件")) |
|
|
yield "\n".join(logs), zip_path |
|
|
else: |
|
|
shutil.rmtree(temp_dir) |
|
|
yield "\n".join(logs), None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
with gr.Blocks(title="Suno 纯音乐复刻专家") as demo: |
|
|
gr.Markdown("## 🎹 Suno 纯音乐复刻专家 (Instrumental Copy)") |
|
|
gr.Markdown("上传音频 -> 调节相似度 -> 一键生成。") |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(scale=1): |
|
|
file_in = gr.File(label="拖入音频文件 (支持批量)", file_count="multiple", height=200) |
|
|
|
|
|
|
|
|
similarity_in = gr.Slider( |
|
|
minimum=0.1, |
|
|
maximum=0.99, |
|
|
value=0.95, |
|
|
step=0.01, |
|
|
label="🎚️ 复刻相似度 (Audio Weight)", |
|
|
info="⚠️ 调节说明:\n" |
|
|
"【0.90 - 0.99】:高保真复刻,几乎和原曲一样,仅音质/混音微调。\n" |
|
|
"【0.60 - 0.80】:保留旋律框架,但在乐器和氛围上有明显变化(二创)。\n" |
|
|
"【0.30 - 0.50】:仅保留一点原曲的影子,大幅度重写(魔改)。" |
|
|
) |
|
|
|
|
|
btn = gr.Button("🚀 开始制作", variant="primary", size="lg") |
|
|
|
|
|
with gr.Column(scale=1): |
|
|
log_box = gr.Textbox(label="运行日志", lines=12) |
|
|
out_file = gr.File(label="下载结果 (ZIP)") |
|
|
|
|
|
|
|
|
btn.click(run_one_click_copy, inputs=[file_in, similarity_in], outputs=[log_box, out_file]) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
demo.launch() |