|
|
import os |
|
|
import asyncio |
|
|
import edge_tts |
|
|
import soundfile as sf |
|
|
import torch |
|
|
import fairseq |
|
|
from fastapi import FastAPI, HTTPException |
|
|
from fastapi.responses import FileResponse |
|
|
from pydantic import BaseModel |
|
|
|
|
|
from uuid import uuid4 |
|
|
import requests |
|
|
from modules.core import preload |
|
|
from modules.models import load_model |
|
|
|
|
|
app = FastAPI() |
|
|
|
|
|
preload() |
|
|
|
|
|
path_models = [ |
|
|
{ |
|
|
"name": "zeta", |
|
|
"label": "Zeta", |
|
|
"ckpt_path": "weights/zet_test1.pth", |
|
|
"index_path": "weights/zet_test1.0.index" |
|
|
}, |
|
|
] |
|
|
|
|
|
|
|
|
edge_tts_voices = [ |
|
|
{"name": "id-ID-GadisNeural", "label": "Indonesian Female (Gadis)", "gender": "Female", "language": "Indonesian"}, |
|
|
{"name": "id-ID-ArdiNeural", "label": "Indonesian Male (Ardi)", "gender": "Male", "language": "Indonesian"}, |
|
|
{"name": "en-US-JennyNeural", "label": "English US Female (Jenny)", "gender": "Female", "language": "English"}, |
|
|
{"name": "en-US-GuyNeural", "label": "English US Male (Guy)", "gender": "Male", "language": "English"}, |
|
|
{"name": "ja-JP-NanamiNeural", "label": "Japanese Female (Nanami)", "gender": "Female", "language": "Japanese"}, |
|
|
{"name": "ja-JP-KeitaNeural", "label": "Japanese Male (Keita)", "gender": "Male", "language": "Japanese"}, |
|
|
] |
|
|
|
|
|
BACK4APP_TTS_URL = os.getenv("BACK4APP_TTS_URL") |
|
|
|
|
|
async def generate_tts_with_back4app(text: str, voice: str, tts_wav: str): |
|
|
try: |
|
|
response = requests.post( |
|
|
f"{BACK4APP_TTS_URL}/tts", |
|
|
json={"text": text, "voice": voice}, |
|
|
timeout=60 |
|
|
) |
|
|
if response.status_code != 200: |
|
|
raise HTTPException(status_code=500, detail=f"Back4App TTS failed: {response.text}") |
|
|
response.raise_for_status() |
|
|
data = response.json() |
|
|
|
|
|
|
|
|
tts_url = data["file"] |
|
|
r = requests.get(f"{BACK4APP_TTS_URL}{tts_url}", stream=True) |
|
|
r.raise_for_status() |
|
|
with open(tts_wav, "wb") as f: |
|
|
for chunk in r.iter_content(8192): |
|
|
f.write(chunk) |
|
|
|
|
|
except Exception as e: |
|
|
raise HTTPException(status_code=500, detail=f"TTS error via Back4App: {e}") |
|
|
|
|
|
class TTSRequest(BaseModel): |
|
|
text: str |
|
|
name: str |
|
|
tts_voice: str = "id-ID-GadisNeural" |
|
|
f0_up_key: int = 0 |
|
|
|
|
|
def limit_tts_files(output_dir, max_files=10): |
|
|
files = sorted( |
|
|
[os.path.join(output_dir, f) for f in os.listdir(output_dir)], |
|
|
key=os.path.getmtime |
|
|
) |
|
|
while len(files) > max_files: |
|
|
os.remove(files[0]) |
|
|
files.pop(0) |
|
|
|
|
|
@app.post("/tts") |
|
|
async def tts_api(req: TTSRequest): |
|
|
|
|
|
model = next((m for m in path_models if m["name"] == req.name), None) |
|
|
if not model: |
|
|
raise HTTPException(status_code=404, detail=f"Model '{req.name}' not found.") |
|
|
|
|
|
ckpt_path = model["ckpt_path"] |
|
|
index_path = model["index_path"] |
|
|
|
|
|
|
|
|
if not os.path.isfile(ckpt_path): |
|
|
raise HTTPException(status_code=404, detail=f"Model file not found: {ckpt_path}") |
|
|
if not os.path.isfile(index_path): |
|
|
raise HTTPException(status_code=404, detail=f"Index file not found: {index_path}") |
|
|
|
|
|
|
|
|
output_dir = "/tmp/tts" |
|
|
os.makedirs(output_dir, exist_ok=True) |
|
|
limit_tts_files(output_dir, max_files=10) |
|
|
tts_wav = f"{output_dir}/{uuid4().hex}_tts.wav" |
|
|
output_wav = f"{output_dir}/{uuid4().hex}_rvc.wav" |
|
|
index_rate = 0.75 |
|
|
|
|
|
|
|
|
try: |
|
|
|
|
|
communicate = edge_tts.Communicate(req.text, req.tts_voice) |
|
|
with open(tts_wav, "wb") as f: |
|
|
async for chunk in communicate.stream(): |
|
|
if chunk["type"] == "audio": |
|
|
f.write(chunk["data"]) |
|
|
|
|
|
except Exception as e: |
|
|
raise HTTPException(status_code=500, detail=f"TTS error: {e}") |
|
|
|
|
|
|
|
|
try: |
|
|
|
|
|
|
|
|
vc = load_model(ckpt_path, config_json="configs/48k-768.json") |
|
|
|
|
|
if vc is None: |
|
|
raise Exception("Failed to load model") |
|
|
|
|
|
|
|
|
result = vc.single( |
|
|
sid=0, |
|
|
input_audio=tts_wav, |
|
|
embedder_model_name="auto", |
|
|
embedding_output_layer="auto", |
|
|
f0_up_key=req.f0_up_key, |
|
|
f0_file="", |
|
|
f0_method="harvest", |
|
|
auto_load_index=True, |
|
|
faiss_index_file=index_path, |
|
|
index_rate=index_rate, |
|
|
output_dir=output_dir |
|
|
) |
|
|
|
|
|
|
|
|
if not (isinstance(result, tuple) and isinstance(result[1], tuple)): |
|
|
raise HTTPException(status_code=500, detail=f"RVC error: {result}") |
|
|
info, (tgt_sr, audio_opt) = result |
|
|
sf.write(output_wav, audio_opt, tgt_sr) |
|
|
except Exception as e: |
|
|
raise HTTPException(status_code=500, detail=f"RVC error: {e}") |
|
|
|
|
|
|
|
|
space_id = os.environ.get("SPACE_ID") |
|
|
if space_id: |
|
|
username, space_name = space_id.split("/") |
|
|
space_url = f"https://{username}-rvc-tts.hf.space" |
|
|
public_url = f"{space_url}/file-tmp?path={output_wav}" |
|
|
else: |
|
|
public_url = f"/file-tmp?path={output_wav}" |
|
|
|
|
|
return {"result": public_url} |
|
|
|
|
|
@app.get("/file-tmp") |
|
|
def get_tmp_file(path: str): |
|
|
|
|
|
if not path.startswith("/tmp/tts/"): |
|
|
raise HTTPException(status_code=403, detail="Forbidden") |
|
|
if not os.path.isfile(path): |
|
|
raise HTTPException(status_code=404, detail="File not found") |
|
|
return FileResponse(path) |
|
|
|
|
|
|