sovits3 / app.py
mzltest's picture
Update app.py
c457f93
import io
from io import BytesIO
import gradio as gr
import librosa
import numpy as np
import soundfile
from inference import slicer
from inference.infer_tool import Svc
import logging
from logmmse import logmmse
from typing import Tuple
import time
import requests
import os,json
from subprocess import getoutput
from urllib.parse import quote
logging.getLogger('numba').setLevel(logging.WARNING)
model_sing = "./G_5000.pth"
#model_talk = "logs/32k/talk1.pth"
config_name = "./config.json"
sid_map = {
"plw":"model_sing"
}
os.system('chmod +x ./pget')
class YukieGradio:
def __init__(self):
self.UI = gr.Blocks()
with self.UI:
with gr.Tabs():
with gr.TabItem("Basic"):
gr.Markdown(value="""
偷的界面,参考LICENSE """)
self.sid = gr.Dropdown(label="音色", choices=[
"plw"], value="plw", interactive=True)
self.dev = gr.Dropdown(label="设备(云端一般请勿切换,使用默认值即可)", choices=[
"cuda", "cpu"], value="cpu", interactive=True)
self.inMic = gr.Textbox(label='url/search string')
self.inAudio = gr.Audio(label="or 上传音频")
self.needLogmmse = gr.Checkbox(label="是否使用自带降噪")
self.slice_db = gr.Slider(label="切片阈值(较嘈杂时-30,保留呼吸声时-50,一般默认-40)",
maximum=0, minimum=-60, step=1, value=-40)
self.vcTransform = gr.Number(
label="升降调(整数,可以正负,半音数量,升高八度就是12)", value=0)
self.vcSubmit = gr.Button("转换", variant="primary")
self.outVcText = gr.Textbox(
label="音高平均偏差半音数量,体现转换音频的跑调情况(一般小于0.5)")
self.outAudio = gr.Audio(
source="upload", type="numpy", label="Output Audio")
self.f0_image = gr.Image(
label="f0曲线,蓝色为输入音高,橙色为合成音频的音高(代码有误差)")
gr.Markdown(value="""
## 注意
如果要在本地使用该demo,请使用 `git lfs clone https://huggingface.co/spaces/yukie/yukie-sovits3`克隆该仓库([简单教程](https://huggingface.co/spaces/yukie/yukie-sovits3/edit/main/local.md))
""")
self.vcSubmit.click(infer, inputs=[self.inMic, self.inAudio, self.vcTransform, self.slice_db, self.needLogmmse, self.sid, self.dev], outputs=[
self.outVcText, self.outAudio, self.f0_image],api_name="go")
def download_audio(url):
# 下载音频数据
response = requests.get(url)
audio_bytes = BytesIO(response.content)
# 转换音频格式为wav
y, sr = librosa.load(audio_bytes, sr=None)
with BytesIO() as wav_bytes:
soundfile.write(wav_bytes, y, sr, format='wav')
wav_bytes.seek(0)
# 读取wav文件
data, sr = soundfile.read(wav_bytes)
# 转换数据类型为int16
data = np.asarray(data * 32767, dtype=np.int16)
return sr, data
def downloadTubeUpload(query):
# Step 1: Search for videos with the given query
search_url = f"https://draw-8fj.begin.app/api/search/{quote(query)}"
search_response = requests.get(search_url).json()
print('1=>', search_response)
# Step 2: Find the first video with duration less than 10 minutes and extract its ID
video_id = None
#search_response = sorted(search_response, key=lambda x: x["views"], reverse=True)
for item in search_response:
duration = item.get("duration_raw")
if duration and len(duration.split(':'))< 3 and int(duration.split(':')[0])<10:
video_id = item.get("id", {}).get("videoId")
break
print('1-r',video_id)
# If no video with duration less than 10 minutes was found, return None
if not video_id:
return None
# Step 3: Get the formats for the video and find the URL for the best audio-only format
formats_url = f"https://draw-8fj.begin.app/api/info/{video_id}"
formats = requests.get(formats_url)
if formats.ok!=True:
formats_url = f"https://draw-8fj-staging.begin.app/api/info/{video_id}"
formats = requests.get(formats_url)
formats_response=formats.json()
print(formats_response["formats"])
best_audio_format = None
for fmt in formats_response.get("formats", []):
if fmt.get("hasVideo") is False and fmt.get("hasAudio") is True and fmt.get("container") == "mp4":
if not best_audio_format or fmt.get("audioBitrate") > best_audio_format.get("audioBitrate"):
best_audio_format = fmt
print(best_audio_format)
# If no suitable audio format was found, return None
if not best_audio_format:
return None
upload_url = "https://lalal.ai/api/upload/"
headers = {
"Content-Disposition": f"attachment; filename={video_id}.mp4"
}
aurl=best_audio_format["url"]
print(aurl)
command = f'./pget -o temp.mp4 -p 4 "{aurl}" '
os.system(command)
result = os.popen('ffprobe -v error -show_entries format=duration -of default=noprint_wrappers=1:nokey=1 temp.mp4')
duration = float(result.read().strip())
# 计算需要截取的时间区间
start_time = max(0, (duration) / 2)
end_time = start_time + 60
# 使用ffmpeg进行截取
os.system(f'ffmpeg -i temp.mp4 -ss {start_time} -t 60 -c copy output.mp4')
command= f'curl --url https://www.lalal.ai/api/upload/ --data-binary @output.mp4 --header "Content-Disposition: attachment; filename=output.mp4" -s'
moutput=getoutput(command)
print(moutput)
upload_response=json.loads(moutput)
return upload_response.get("id")
def split_file(file_id):
command = f'rm temp.mp4'
os.system(command)
command = f'rm output.mp4'
os.system(command)
url_for_split = "https://www.lalal.ai/api/preview/"
#headers={'x-csrftoken':'fdH0XaNK6YCAUnSgaNK2hEzKvTv7UcXj'}
query_args = {'id': file_id, 'splitter': "phoenix"}
response = requests.post(url_for_split, data=query_args)#headers=headers
split_result = response.json()
if split_result["status"] == "error":
raise RuntimeError(split_result["error"])
def check_file(file_id):
url_for_check = "https://www.lalal.ai/api/check/"
query_args = {'id': file_id}
is_queueup = False
while True:
response = requests.get(url_for_check, params=query_args)
check_result = response.json()
if check_result["status"] == "error":
raise RuntimeError(check_result["error"])
task_state = check_result["task"]["state"]
if task_state == "error":
raise RuntimeError(check_result["task"]["error"])
if task_state == "progress":
progress = int(check_result["task"]["progress"])
if progress == 0 and not is_queueup:
print("Queue up...")
is_queueup = True
elif progress > 0:
print(f"Progress: {progress}%")
if task_state == "success":
stem_track_url = check_result["preview"]["stem_track"]
back_track_url = check_result["preview"]["back_track"]
return stem_track_url, back_track_url
time.sleep(30)
def infer(inMic, inAudio, transform, slice_db, lm, sid, dev):
if inAudio != None:
sampling_rate, inaudio = inAudio
else:
if inMic != None:
if inMic.startswith("http") == False:
id=downloadTubeUpload(inMic)
split_file(id)
sampling_rate, inaudio=download_audio(check_file(id)[0])
else:
sampling_rate, inaudio=download_audio(inMic)
else:
return "请上传一段音频后再次尝试", None
print("start inference")
start_time = time.time()
# 预处理,重编码
inaudio = (inaudio / np.iinfo(inaudio.dtype).max).astype(np.float32)
if len(inaudio.shape) > 1:
inaudio = librosa.to_mono(inaudio.transpose(1, 0))
if sampling_rate != 32000:
inaudio = librosa.resample(
inaudio, orig_sr=sampling_rate, target_sr=32000)
if lm:
inaudio = logmmse(inaudio, 32000)
ori_wav_path = "tmp_ori.wav"
soundfile.write(ori_wav_path, inaudio, 32000, format="wav")
chunks = slicer.cut(ori_wav_path, db_thresh=slice_db)
audio_data, audio_sr = slicer.chunks2audio(ori_wav_path, chunks)
audio = []
sid = sid_map[sid]
if sid == "model_sing":
svc_model = Svc(model_sing, config_name, dev=dev)
for (slice_tag, data) in audio_data:
length = int(np.ceil(len(data) / audio_sr * svc_model.target_sample))
raw_path = io.BytesIO()
soundfile.write(raw_path, data, audio_sr, format="wav")
raw_path.seek(0)
if slice_tag:
_audio = np.zeros(length)
else:
out_audio, out_str = svc_model.infer("group", transform, raw_path)
_audio = out_audio.cpu().numpy()
audio.extend(list(_audio))
audio = (np.array(audio) * 32768.0).astype('int16')
used_time = time.time() - start_time
out_wav_path = "tmp.wav"
soundfile.write(out_wav_path, audio, 32000, format="wav")
mistake, var = svc_model.calc_error(ori_wav_path, out_wav_path, transform)
out_picture = svc_model.f0_plt(ori_wav_path, out_wav_path, transform)
out_str = ("Success! total use time:{}s\n半音偏差:{}\n半音方差:{}".format(
used_time, mistake, var))
return out_str, (32000, audio), gr.Image.update("temp.jpg")
if __name__ == "__main__":
app = YukieGradio()
app.UI.launch()