|
|
import io |
|
|
from io import BytesIO |
|
|
import gradio as gr |
|
|
import librosa |
|
|
import numpy as np |
|
|
import soundfile |
|
|
from inference import slicer |
|
|
from inference.infer_tool import Svc |
|
|
import logging |
|
|
from logmmse import logmmse |
|
|
from typing import Tuple |
|
|
import time |
|
|
import requests |
|
|
import os,json |
|
|
from subprocess import getoutput |
|
|
|
|
|
from urllib.parse import quote |
|
|
|
|
|
logging.getLogger('numba').setLevel(logging.WARNING) |
|
|
|
|
|
model_sing = "./G_5000.pth" |
|
|
|
|
|
config_name = "./config.json" |
|
|
|
|
|
sid_map = { |
|
|
"plw":"model_sing" |
|
|
} |
|
|
|
|
|
os.system('chmod +x ./pget') |
|
|
class YukieGradio: |
|
|
def __init__(self): |
|
|
self.UI = gr.Blocks() |
|
|
with self.UI: |
|
|
with gr.Tabs(): |
|
|
with gr.TabItem("Basic"): |
|
|
gr.Markdown(value=""" |
|
|
偷的界面,参考LICENSE """) |
|
|
self.sid = gr.Dropdown(label="音色", choices=[ |
|
|
"plw"], value="plw", interactive=True) |
|
|
self.dev = gr.Dropdown(label="设备(云端一般请勿切换,使用默认值即可)", choices=[ |
|
|
"cuda", "cpu"], value="cpu", interactive=True) |
|
|
self.inMic = gr.Textbox(label='url/search string') |
|
|
self.inAudio = gr.Audio(label="or 上传音频") |
|
|
self.needLogmmse = gr.Checkbox(label="是否使用自带降噪") |
|
|
self.slice_db = gr.Slider(label="切片阈值(较嘈杂时-30,保留呼吸声时-50,一般默认-40)", |
|
|
maximum=0, minimum=-60, step=1, value=-40) |
|
|
self.vcTransform = gr.Number( |
|
|
label="升降调(整数,可以正负,半音数量,升高八度就是12)", value=0) |
|
|
self.vcSubmit = gr.Button("转换", variant="primary") |
|
|
self.outVcText = gr.Textbox( |
|
|
label="音高平均偏差半音数量,体现转换音频的跑调情况(一般小于0.5)") |
|
|
self.outAudio = gr.Audio( |
|
|
source="upload", type="numpy", label="Output Audio") |
|
|
self.f0_image = gr.Image( |
|
|
label="f0曲线,蓝色为输入音高,橙色为合成音频的音高(代码有误差)") |
|
|
gr.Markdown(value=""" |
|
|
## 注意 |
|
|
如果要在本地使用该demo,请使用 `git lfs clone https://huggingface.co/spaces/yukie/yukie-sovits3`克隆该仓库([简单教程](https://huggingface.co/spaces/yukie/yukie-sovits3/edit/main/local.md)) |
|
|
""") |
|
|
self.vcSubmit.click(infer, inputs=[self.inMic, self.inAudio, self.vcTransform, self.slice_db, self.needLogmmse, self.sid, self.dev], outputs=[ |
|
|
self.outVcText, self.outAudio, self.f0_image],api_name="go") |
|
|
|
|
|
def download_audio(url): |
|
|
|
|
|
response = requests.get(url) |
|
|
audio_bytes = BytesIO(response.content) |
|
|
|
|
|
|
|
|
y, sr = librosa.load(audio_bytes, sr=None) |
|
|
with BytesIO() as wav_bytes: |
|
|
soundfile.write(wav_bytes, y, sr, format='wav') |
|
|
wav_bytes.seek(0) |
|
|
|
|
|
|
|
|
data, sr = soundfile.read(wav_bytes) |
|
|
|
|
|
data = np.asarray(data * 32767, dtype=np.int16) |
|
|
return sr, data |
|
|
|
|
|
def downloadTubeUpload(query): |
|
|
|
|
|
search_url = f"https://draw-8fj.begin.app/api/search/{quote(query)}" |
|
|
search_response = requests.get(search_url).json() |
|
|
print('1=>', search_response) |
|
|
|
|
|
video_id = None |
|
|
|
|
|
for item in search_response: |
|
|
duration = item.get("duration_raw") |
|
|
if duration and len(duration.split(':'))< 3 and int(duration.split(':')[0])<10: |
|
|
video_id = item.get("id", {}).get("videoId") |
|
|
break |
|
|
print('1-r',video_id) |
|
|
|
|
|
if not video_id: |
|
|
return None |
|
|
|
|
|
|
|
|
formats_url = f"https://draw-8fj.begin.app/api/info/{video_id}" |
|
|
formats = requests.get(formats_url) |
|
|
if formats.ok!=True: |
|
|
formats_url = f"https://draw-8fj-staging.begin.app/api/info/{video_id}" |
|
|
formats = requests.get(formats_url) |
|
|
formats_response=formats.json() |
|
|
print(formats_response["formats"]) |
|
|
best_audio_format = None |
|
|
for fmt in formats_response.get("formats", []): |
|
|
if fmt.get("hasVideo") is False and fmt.get("hasAudio") is True and fmt.get("container") == "mp4": |
|
|
if not best_audio_format or fmt.get("audioBitrate") > best_audio_format.get("audioBitrate"): |
|
|
best_audio_format = fmt |
|
|
print(best_audio_format) |
|
|
|
|
|
if not best_audio_format: |
|
|
return None |
|
|
|
|
|
upload_url = "https://lalal.ai/api/upload/" |
|
|
headers = { |
|
|
"Content-Disposition": f"attachment; filename={video_id}.mp4" |
|
|
} |
|
|
aurl=best_audio_format["url"] |
|
|
print(aurl) |
|
|
command = f'./pget -o temp.mp4 -p 4 "{aurl}" ' |
|
|
os.system(command) |
|
|
|
|
|
|
|
|
result = os.popen('ffprobe -v error -show_entries format=duration -of default=noprint_wrappers=1:nokey=1 temp.mp4') |
|
|
duration = float(result.read().strip()) |
|
|
|
|
|
|
|
|
start_time = max(0, (duration) / 2) |
|
|
end_time = start_time + 60 |
|
|
|
|
|
|
|
|
|
|
|
os.system(f'ffmpeg -i temp.mp4 -ss {start_time} -t 60 -c copy output.mp4') |
|
|
|
|
|
command= f'curl --url https://www.lalal.ai/api/upload/ --data-binary @output.mp4 --header "Content-Disposition: attachment; filename=output.mp4" -s' |
|
|
moutput=getoutput(command) |
|
|
print(moutput) |
|
|
upload_response=json.loads(moutput) |
|
|
return upload_response.get("id") |
|
|
|
|
|
def split_file(file_id): |
|
|
command = f'rm temp.mp4' |
|
|
os.system(command) |
|
|
command = f'rm output.mp4' |
|
|
os.system(command) |
|
|
url_for_split = "https://www.lalal.ai/api/preview/" |
|
|
|
|
|
|
|
|
query_args = {'id': file_id, 'splitter': "phoenix"} |
|
|
response = requests.post(url_for_split, data=query_args) |
|
|
split_result = response.json() |
|
|
if split_result["status"] == "error": |
|
|
raise RuntimeError(split_result["error"]) |
|
|
|
|
|
def check_file(file_id): |
|
|
url_for_check = "https://www.lalal.ai/api/check/" |
|
|
query_args = {'id': file_id} |
|
|
|
|
|
is_queueup = False |
|
|
|
|
|
while True: |
|
|
response = requests.get(url_for_check, params=query_args) |
|
|
check_result = response.json() |
|
|
|
|
|
if check_result["status"] == "error": |
|
|
raise RuntimeError(check_result["error"]) |
|
|
|
|
|
task_state = check_result["task"]["state"] |
|
|
|
|
|
if task_state == "error": |
|
|
raise RuntimeError(check_result["task"]["error"]) |
|
|
|
|
|
if task_state == "progress": |
|
|
progress = int(check_result["task"]["progress"]) |
|
|
if progress == 0 and not is_queueup: |
|
|
print("Queue up...") |
|
|
is_queueup = True |
|
|
elif progress > 0: |
|
|
print(f"Progress: {progress}%") |
|
|
|
|
|
if task_state == "success": |
|
|
stem_track_url = check_result["preview"]["stem_track"] |
|
|
back_track_url = check_result["preview"]["back_track"] |
|
|
return stem_track_url, back_track_url |
|
|
|
|
|
time.sleep(30) |
|
|
def infer(inMic, inAudio, transform, slice_db, lm, sid, dev): |
|
|
if inAudio != None: |
|
|
sampling_rate, inaudio = inAudio |
|
|
else: |
|
|
if inMic != None: |
|
|
if inMic.startswith("http") == False: |
|
|
id=downloadTubeUpload(inMic) |
|
|
split_file(id) |
|
|
sampling_rate, inaudio=download_audio(check_file(id)[0]) |
|
|
else: |
|
|
sampling_rate, inaudio=download_audio(inMic) |
|
|
else: |
|
|
return "请上传一段音频后再次尝试", None |
|
|
print("start inference") |
|
|
start_time = time.time() |
|
|
|
|
|
inaudio = (inaudio / np.iinfo(inaudio.dtype).max).astype(np.float32) |
|
|
if len(inaudio.shape) > 1: |
|
|
inaudio = librosa.to_mono(inaudio.transpose(1, 0)) |
|
|
if sampling_rate != 32000: |
|
|
inaudio = librosa.resample( |
|
|
inaudio, orig_sr=sampling_rate, target_sr=32000) |
|
|
if lm: |
|
|
inaudio = logmmse(inaudio, 32000) |
|
|
|
|
|
ori_wav_path = "tmp_ori.wav" |
|
|
soundfile.write(ori_wav_path, inaudio, 32000, format="wav") |
|
|
chunks = slicer.cut(ori_wav_path, db_thresh=slice_db) |
|
|
audio_data, audio_sr = slicer.chunks2audio(ori_wav_path, chunks) |
|
|
|
|
|
audio = [] |
|
|
sid = sid_map[sid] |
|
|
if sid == "model_sing": |
|
|
svc_model = Svc(model_sing, config_name, dev=dev) |
|
|
|
|
|
for (slice_tag, data) in audio_data: |
|
|
length = int(np.ceil(len(data) / audio_sr * svc_model.target_sample)) |
|
|
raw_path = io.BytesIO() |
|
|
soundfile.write(raw_path, data, audio_sr, format="wav") |
|
|
raw_path.seek(0) |
|
|
if slice_tag: |
|
|
_audio = np.zeros(length) |
|
|
else: |
|
|
out_audio, out_str = svc_model.infer("group", transform, raw_path) |
|
|
_audio = out_audio.cpu().numpy() |
|
|
audio.extend(list(_audio)) |
|
|
audio = (np.array(audio) * 32768.0).astype('int16') |
|
|
used_time = time.time() - start_time |
|
|
|
|
|
out_wav_path = "tmp.wav" |
|
|
soundfile.write(out_wav_path, audio, 32000, format="wav") |
|
|
|
|
|
mistake, var = svc_model.calc_error(ori_wav_path, out_wav_path, transform) |
|
|
out_picture = svc_model.f0_plt(ori_wav_path, out_wav_path, transform) |
|
|
out_str = ("Success! total use time:{}s\n半音偏差:{}\n半音方差:{}".format( |
|
|
used_time, mistake, var)) |
|
|
|
|
|
return out_str, (32000, audio), gr.Image.update("temp.jpg") |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
app = YukieGradio() |
|
|
app.UI.launch() |
|
|
|