rvc_webui / api.py
aryo100's picture
update api
c02702b
import os
import asyncio
import edge_tts
import soundfile as sf
import torch
import fairseq
from fastapi import FastAPI, HTTPException
from fastapi.responses import FileResponse
from pydantic import BaseModel
# from modules import models
from uuid import uuid4
import requests
from modules.core import preload
from modules.models import load_model
app = FastAPI()
preload()
path_models = [
{
"name": "zeta",
"label": "Zeta",
"ckpt_path": "weights/zet_test1.pth",
"index_path": "weights/zet_test1.0.index"
},
]
# List model edge_tts (voice) dengan label, name, gender
edge_tts_voices = [
{"name": "id-ID-GadisNeural", "label": "Indonesian Female (Gadis)", "gender": "Female", "language": "Indonesian"},
{"name": "id-ID-ArdiNeural", "label": "Indonesian Male (Ardi)", "gender": "Male", "language": "Indonesian"},
{"name": "en-US-JennyNeural", "label": "English US Female (Jenny)", "gender": "Female", "language": "English"},
{"name": "en-US-GuyNeural", "label": "English US Male (Guy)", "gender": "Male", "language": "English"},
{"name": "ja-JP-NanamiNeural", "label": "Japanese Female (Nanami)", "gender": "Female", "language": "Japanese"},
{"name": "ja-JP-KeitaNeural", "label": "Japanese Male (Keita)", "gender": "Male", "language": "Japanese"},
]
BACK4APP_TTS_URL = os.getenv("BACK4APP_TTS_URL")
async def generate_tts_with_back4app(text: str, voice: str, tts_wav: str):
try:
response = requests.post(
f"{BACK4APP_TTS_URL}/tts",
json={"text": text, "voice": voice},
timeout=60
)
if response.status_code != 200:
raise HTTPException(status_code=500, detail=f"Back4App TTS failed: {response.text}")
response.raise_for_status()
data = response.json()
# 2. Ambil file URL dari response
tts_url = data["file"]
r = requests.get(f"{BACK4APP_TTS_URL}{tts_url}", stream=True)
r.raise_for_status()
with open(tts_wav, "wb") as f:
for chunk in r.iter_content(8192):
f.write(chunk)
except Exception as e:
raise HTTPException(status_code=500, detail=f"TTS error via Back4App: {e}")
class TTSRequest(BaseModel):
text: str
name: str # nama model yang sesuai dengan daftar di 'models'
tts_voice: str = "id-ID-GadisNeural"
f0_up_key: int = 0
def limit_tts_files(output_dir, max_files=10):
files = sorted(
[os.path.join(output_dir, f) for f in os.listdir(output_dir)],
key=os.path.getmtime
)
while len(files) > max_files:
os.remove(files[0])
files.pop(0)
@app.post("/tts")
async def tts_api(req: TTSRequest):
# Cari model berdasarkan name
model = next((m for m in path_models if m["name"] == req.name), None)
if not model:
raise HTTPException(status_code=404, detail=f"Model '{req.name}' not found.")
ckpt_path = model["ckpt_path"]
index_path = model["index_path"]
# Cek file model dan index
if not os.path.isfile(ckpt_path):
raise HTTPException(status_code=404, detail=f"Model file not found: {ckpt_path}")
if not os.path.isfile(index_path):
raise HTTPException(status_code=404, detail=f"Index file not found: {index_path}")
# Path output
output_dir = "/tmp/tts"
os.makedirs(output_dir, exist_ok=True)
limit_tts_files(output_dir, max_files=10)
tts_wav = f"{output_dir}/{uuid4().hex}_tts.wav"
output_wav = f"{output_dir}/{uuid4().hex}_rvc.wav"
index_rate = 0.75
# 1. Generate TTS
try:
# Ganti pakai Back4App TTS
communicate = edge_tts.Communicate(req.text, req.tts_voice)
with open(tts_wav, "wb") as f:
async for chunk in communicate.stream():
if chunk["type"] == "audio":
f.write(chunk["data"])
# await generate_tts_with_back4app(req.text, req.tts_voice, tts_wav)
except Exception as e:
raise HTTPException(status_code=500, detail=f"TTS error: {e}")
# 2. Voice Conversion
try:
# models.load_model(ckpt_path)
# vc = models.vc_model
vc = load_model(ckpt_path, config_json="configs/48k-768.json")
if vc is None:
raise Exception("Failed to load model")
# Run conversion menggunakan method single() yang benar
result = vc.single(
sid=0, # speaker id
input_audio=tts_wav, # path audio input
embedder_model_name="auto", # auto detect embedder
embedding_output_layer="auto", # auto detect layer
f0_up_key=req.f0_up_key, # pitch shift
f0_file="", # f0 curve file (kosong)
f0_method="harvest", # f0 method
auto_load_index=True, # auto load index
faiss_index_file=index_path, # index file path
index_rate=index_rate, # index rate
output_dir=output_dir # output directory
)
# Cek apakah result tuple atau string error
if not (isinstance(result, tuple) and isinstance(result[1], tuple)):
raise HTTPException(status_code=500, detail=f"RVC error: {result}")
info, (tgt_sr, audio_opt) = result
sf.write(output_wav, audio_opt, tgt_sr)
except Exception as e:
raise HTTPException(status_code=500, detail=f"RVC error: {e}")
# Ambil domain dari environment Hugging Face
space_id = os.environ.get("SPACE_ID")
if space_id:
username, space_name = space_id.split("/")
space_url = f"https://{username}-rvc-tts.hf.space"
public_url = f"{space_url}/file-tmp?path={output_wav}"
else:
public_url = f"/file-tmp?path={output_wav}"
return {"result": public_url}
@app.get("/file-tmp")
def get_tmp_file(path: str):
# Security: hanya izinkan akses file di /tmp/tts
if not path.startswith("/tmp/tts/"):
raise HTTPException(status_code=403, detail="Forbidden")
if not os.path.isfile(path):
raise HTTPException(status_code=404, detail="File not found")
return FileResponse(path)
# Jalankan dengan: uvicorn api_tts:app --reload