rvc_api / api.py
aryo100's picture
update audio output debug
23104aa
import os
import asyncio
import edge_tts
import soundfile as sf
import torch
import fairseq
import numpy as np
from fastapi import FastAPI, HTTPException
from fastapi.responses import FileResponse
from pydantic import BaseModel
from modules import models
from uuid import uuid4
import requests
from modules.core import preload
from modules.models import load_model
app = FastAPI()
preload()
path_models = [
# {
# "name": "zeta",
# "label": "Zeta",
# "ckpt_path": "zet_test1.pth",
# "index_path": "zet_test1.0.index"
# },
{
"name": "zeta",
"ckpt_path": "zeta.pth",
"index_path": "added_IVF409_Flat_nprobe_1.index"
},
{
"name": "kobov2",
"ckpt_path": "kobov2.pth",
"index_path": "added_IVF454_Flat_nprobe_1_kobov2_v2.index"
},
{
"name": "chamber",
"ckpt_path": "Chamber.pth",
"index_path": "added_IVF746_Flat_nprobe_1_Chamber_v2.index"
},
]
# List model edge_tts (voice) dengan label, name, gender
edge_tts_voices = [
{"name": "id-ID-GadisNeural", "label": "Indonesian Female (Gadis)", "gender": "Female", "language": "Indonesian"},
{"name": "id-ID-ArdiNeural", "label": "Indonesian Male (Ardi)", "gender": "Male", "language": "Indonesian"},
{"name": "en-US-JennyNeural", "label": "English US Female (Jenny)", "gender": "Female", "language": "English"},
{"name": "en-US-GuyNeural", "label": "English US Male (Guy)", "gender": "Male", "language": "English"},
{"name": "ja-JP-NanamiNeural", "label": "Japanese Female (Nanami)", "gender": "Female", "language": "Japanese"},
{"name": "ja-JP-KeitaNeural", "label": "Japanese Male (Keita)", "gender": "Male", "language": "Japanese"},
]
BACK4APP_TTS_URL = os.getenv("BACK4APP_TTS_URL")
HF_SPACE_TTS_URL = os.getenv("HF_SPACE_TTS_URL")
async def generate_tts_with_hf_space(text: str, speaker: str, tts_wav: str):
"""
Generate TTS menggunakan Hugging Face Space API.
Args:
text: Teks yang akan diubah menjadi suara
speaker: Nama speaker (contoh: "gadis")
tts_wav: Path file output untuk menyimpan audio
"""
try:
# 1. Kirim request ke API TTS
response = requests.post(
f"{HF_SPACE_TTS_URL}/api/tts",
json={"text": text, "speaker": speaker},
headers={"Content-Type": "application/json"},
timeout=60
)
if response.status_code != 200:
raise HTTPException(
status_code=500,
detail=f"HF Space TTS failed: {response.status_code} - {response.text}"
)
response.raise_for_status()
data = response.json()
# 2. Validasi response
if not data.get("success"):
raise HTTPException(
status_code=500,
detail=f"TTS API returned error: {data.get('message', 'Unknown error')}"
)
# 3. Ambil download URL dari response
download_url = data.get("download_url")
if not download_url:
raise HTTPException(
status_code=500,
detail="Response missing download_url"
)
# 4. Download file audio
# Jika download_url relatif, tambahkan base URL
if download_url.startswith("/"):
full_download_url = f"{HF_SPACE_TTS_URL}{download_url}"
else:
full_download_url = download_url
r = requests.get(full_download_url, stream=True, timeout=60)
r.raise_for_status()
# 5. Simpan file ke tts_wav
with open(tts_wav, "wb") as f:
for chunk in r.iter_content(8192):
f.write(chunk)
except requests.exceptions.RequestException as e:
raise HTTPException(status_code=500, detail=f"TTS error via HF Space: {e}")
except Exception as e:
raise HTTPException(status_code=500, detail=f"TTS error via HF Space: {e}")
async def generate_tts_with_back4app(text: str, voice: str, tts_wav: str):
try:
response = requests.post(
f"{BACK4APP_TTS_URL}/tts",
json={"text": text, "voice": voice},
timeout=60
)
if response.status_code != 200:
raise HTTPException(status_code=500, detail=f"Back4App TTS failed: {response.text}")
response.raise_for_status()
data = response.json()
# 2. Ambil file URL dari response
tts_url = data["file"]
r = requests.get(f"{BACK4APP_TTS_URL}{tts_url}", stream=True)
r.raise_for_status()
with open(tts_wav, "wb") as f:
for chunk in r.iter_content(8192):
f.write(chunk)
except Exception as e:
raise HTTPException(status_code=500, detail=f"TTS error via Back4App: {e}")
class TTSRequest(BaseModel):
text: str
name: str # nama model yang sesuai dengan daftar di 'models'
tts_voice: str = "id-ID-GadisNeural"
f0_up_key: int = 0
def limit_tts_files(output_dir, max_files=10):
files = sorted(
[os.path.join(output_dir, f) for f in os.listdir(output_dir)],
key=os.path.getmtime
)
while len(files) > max_files:
os.remove(files[0])
files.pop(0)
@app.post("/tts")
async def tts_api(req: TTSRequest):
# Cari model berdasarkan name
model = next((m for m in path_models if m["name"] == req.name), None)
if not model:
raise HTTPException(status_code=404, detail=f"Model '{req.name}' not found.")
ckpt_path = os.path.join("weights", model["ckpt_path"])
index_path = os.path.join("weights", model["index_path"])
# Cek file model dan index
if not os.path.isfile(ckpt_path):
raise HTTPException(status_code=404, detail=f"Model file not found: {ckpt_path}")
if not os.path.isfile(index_path):
raise HTTPException(status_code=404, detail=f"Index file not found: {index_path}")
# Path output
output_dir = "/app/outputs"
os.makedirs(output_dir, exist_ok=True)
limit_tts_files(output_dir, max_files=10)
tts_wav = f"{output_dir}/{uuid4().hex}_tts.wav"
output_wav = f"{output_dir}/{uuid4().hex}_rvc.wav"
index_rate = 0.75
# 1. Generate TTS
try:
# Ganti pakai Back4App TTS
# communicate = edge_tts.Communicate(req.text, req.tts_voice)
# with open(tts_wav, "wb") as f:
# async for chunk in communicate.stream():
# if chunk["type"] == "audio":
# f.write(chunk["data"])
# await generate_tts_with_back4app(req.text, req.tts_voice, tts_wav)
await generate_tts_with_hf_space(req.text, req.tts_voice, tts_wav)
# Validasi file audio TTS
if not os.path.isfile(tts_wav):
raise HTTPException(status_code=500, detail="TTS file was not created")
if os.path.getsize(tts_wav) == 0:
raise HTTPException(status_code=500, detail="TTS file is empty")
except Exception as e:
raise HTTPException(status_code=500, detail=f"TTS error: {e}")
# 2. Voice Conversion
try:
models.load_model(ckpt_path)
vc = models.vc_model
# vc = load_model(ckpt_path, config_json="configs/48k-768.json")
if vc is None:
raise Exception("Failed to load model")
# Run conversion menggunakan method single() yang benar
audio_opt = vc.single(
sid=0, # speaker id
input_audio=tts_wav, # path audio input
embedder_model_name="auto", # auto detect embedder
embedding_output_layer="auto", # auto detect layer
f0_up_key=req.f0_up_key, # pitch shift
f0_file="", # f0 curve file (kosong)
# f0_method="pm", # f0 method
f0_method="harvest", # f0 method
auto_load_index=True, # auto load index
faiss_index_file=index_path, # index file path
index_rate=index_rate, # index rate
output_dir=output_dir # output directory
)
# Cek apakah result adalah numpy array yang valid
print("input_audio : ", tts_wav)
print("audio_opt type: ", type(audio_opt))
print("audio_opt shape: ", audio_opt.shape if hasattr(audio_opt, 'shape') else 'N/A')
print("audio_opt sample: ", audio_opt[:10] if hasattr(audio_opt, '__getitem__') else 'N/A')
if not isinstance(audio_opt, np.ndarray):
raise HTTPException(status_code=500, detail=f"RVC error: Expected numpy array, got {type(audio_opt)}")
# Cek apakah audio_opt tidak kosong (tidak semua zeros)
if len(audio_opt) == 0 or np.all(audio_opt == 0):
raise HTTPException(status_code=500, detail="RVC error: Generated audio is empty or all zeros. Check input audio and model configuration.")
# Gunakan tgt_sr dari model
tgt_sr = vc.tgt_sr
sf.write(output_wav, audio_opt, tgt_sr)
except Exception as e:
raise HTTPException(status_code=500, detail=f"RVC exception error: {e}")
# Ambil domain dari environment Hugging Face
space_id = os.environ.get("SPACE_ID")
if space_id:
username, space_name = space_id.split("/")
space_url = f"https://{username}-rvc-api.hf.space"
public_url = f"{space_url}/file-tmp?path={output_wav}"
else:
public_url = f"/file-tmp?path={output_wav}"
return {"result": public_url}
@app.get("/file-tmp")
def get_tmp_file(path: str):
# Security: hanya izinkan akses file di /app/outputs
if not path.startswith("/app/outputs/"):
raise HTTPException(status_code=403, detail="Forbidden")
if not os.path.isfile(path):
raise HTTPException(status_code=404, detail="File not found")
return FileResponse(path)
# Jalankan dengan: uvicorn api_tts:app --reload