import os import asyncio import edge_tts import soundfile as sf import torch import fairseq from fastapi import FastAPI, HTTPException from fastapi.responses import FileResponse from pydantic import BaseModel # from modules import models from uuid import uuid4 import requests from modules.core import preload from modules.models import load_model app = FastAPI() preload() path_models = [ { "name": "zeta", "label": "Zeta", "ckpt_path": "weights/zet_test1.pth", "index_path": "weights/zet_test1.0.index" }, ] # List model edge_tts (voice) dengan label, name, gender edge_tts_voices = [ {"name": "id-ID-GadisNeural", "label": "Indonesian Female (Gadis)", "gender": "Female", "language": "Indonesian"}, {"name": "id-ID-ArdiNeural", "label": "Indonesian Male (Ardi)", "gender": "Male", "language": "Indonesian"}, {"name": "en-US-JennyNeural", "label": "English US Female (Jenny)", "gender": "Female", "language": "English"}, {"name": "en-US-GuyNeural", "label": "English US Male (Guy)", "gender": "Male", "language": "English"}, {"name": "ja-JP-NanamiNeural", "label": "Japanese Female (Nanami)", "gender": "Female", "language": "Japanese"}, {"name": "ja-JP-KeitaNeural", "label": "Japanese Male (Keita)", "gender": "Male", "language": "Japanese"}, ] BACK4APP_TTS_URL = os.getenv("BACK4APP_TTS_URL") async def generate_tts_with_back4app(text: str, voice: str, tts_wav: str): try: response = requests.post( f"{BACK4APP_TTS_URL}/tts", json={"text": text, "voice": voice}, timeout=60 ) if response.status_code != 200: raise HTTPException(status_code=500, detail=f"Back4App TTS failed: {response.text}") response.raise_for_status() data = response.json() # 2. Ambil file URL dari response tts_url = data["file"] r = requests.get(f"{BACK4APP_TTS_URL}{tts_url}", stream=True) r.raise_for_status() with open(tts_wav, "wb") as f: for chunk in r.iter_content(8192): f.write(chunk) except Exception as e: raise HTTPException(status_code=500, detail=f"TTS error via Back4App: {e}") class TTSRequest(BaseModel): text: str name: str # nama model yang sesuai dengan daftar di 'models' tts_voice: str = "id-ID-GadisNeural" f0_up_key: int = 0 def limit_tts_files(output_dir, max_files=10): files = sorted( [os.path.join(output_dir, f) for f in os.listdir(output_dir)], key=os.path.getmtime ) while len(files) > max_files: os.remove(files[0]) files.pop(0) @app.post("/tts") async def tts_api(req: TTSRequest): # Cari model berdasarkan name model = next((m for m in path_models if m["name"] == req.name), None) if not model: raise HTTPException(status_code=404, detail=f"Model '{req.name}' not found.") ckpt_path = model["ckpt_path"] index_path = model["index_path"] # Cek file model dan index if not os.path.isfile(ckpt_path): raise HTTPException(status_code=404, detail=f"Model file not found: {ckpt_path}") if not os.path.isfile(index_path): raise HTTPException(status_code=404, detail=f"Index file not found: {index_path}") # Path output output_dir = "/tmp/tts" os.makedirs(output_dir, exist_ok=True) limit_tts_files(output_dir, max_files=10) tts_wav = f"{output_dir}/{uuid4().hex}_tts.wav" output_wav = f"{output_dir}/{uuid4().hex}_rvc.wav" index_rate = 0.75 # 1. Generate TTS try: # Ganti pakai Back4App TTS communicate = edge_tts.Communicate(req.text, req.tts_voice) with open(tts_wav, "wb") as f: async for chunk in communicate.stream(): if chunk["type"] == "audio": f.write(chunk["data"]) # await generate_tts_with_back4app(req.text, req.tts_voice, tts_wav) except Exception as e: raise HTTPException(status_code=500, detail=f"TTS error: {e}") # 2. Voice Conversion try: # models.load_model(ckpt_path) # vc = models.vc_model vc = load_model(ckpt_path, config_json="configs/48k-768.json") if vc is None: raise Exception("Failed to load model") # Run conversion menggunakan method single() yang benar result = vc.single( sid=0, # speaker id input_audio=tts_wav, # path audio input embedder_model_name="auto", # auto detect embedder embedding_output_layer="auto", # auto detect layer f0_up_key=req.f0_up_key, # pitch shift f0_file="", # f0 curve file (kosong) f0_method="harvest", # f0 method auto_load_index=True, # auto load index faiss_index_file=index_path, # index file path index_rate=index_rate, # index rate output_dir=output_dir # output directory ) # Cek apakah result tuple atau string error if not (isinstance(result, tuple) and isinstance(result[1], tuple)): raise HTTPException(status_code=500, detail=f"RVC error: {result}") info, (tgt_sr, audio_opt) = result sf.write(output_wav, audio_opt, tgt_sr) except Exception as e: raise HTTPException(status_code=500, detail=f"RVC error: {e}") # Ambil domain dari environment Hugging Face space_id = os.environ.get("SPACE_ID") if space_id: username, space_name = space_id.split("/") space_url = f"https://{username}-rvc-tts.hf.space" public_url = f"{space_url}/file-tmp?path={output_wav}" else: public_url = f"/file-tmp?path={output_wav}" return {"result": public_url} @app.get("/file-tmp") def get_tmp_file(path: str): # Security: hanya izinkan akses file di /tmp/tts if not path.startswith("/tmp/tts/"): raise HTTPException(status_code=403, detail="Forbidden") if not os.path.isfile(path): raise HTTPException(status_code=404, detail="File not found") return FileResponse(path) # Jalankan dengan: uvicorn api_tts:app --reload