Spaces:
Sleeping
Sleeping
| import os | |
| import asyncio | |
| import edge_tts | |
| import soundfile as sf | |
| import torch | |
| import fairseq | |
| import numpy as np | |
| from fastapi import FastAPI, HTTPException | |
| from fastapi.responses import FileResponse | |
| from pydantic import BaseModel | |
| from modules import models | |
| from uuid import uuid4 | |
| import requests | |
| from modules.core import preload | |
| from modules.models import load_model | |
| app = FastAPI() | |
| preload() | |
| path_models = [ | |
| # { | |
| # "name": "zeta", | |
| # "label": "Zeta", | |
| # "ckpt_path": "zet_test1.pth", | |
| # "index_path": "zet_test1.0.index" | |
| # }, | |
| { | |
| "name": "zeta", | |
| "ckpt_path": "zeta.pth", | |
| "index_path": "added_IVF409_Flat_nprobe_1.index" | |
| }, | |
| { | |
| "name": "kobov2", | |
| "ckpt_path": "kobov2.pth", | |
| "index_path": "added_IVF454_Flat_nprobe_1_kobov2_v2.index" | |
| }, | |
| { | |
| "name": "chamber", | |
| "ckpt_path": "Chamber.pth", | |
| "index_path": "added_IVF746_Flat_nprobe_1_Chamber_v2.index" | |
| }, | |
| ] | |
| # List model edge_tts (voice) dengan label, name, gender | |
| edge_tts_voices = [ | |
| {"name": "id-ID-GadisNeural", "label": "Indonesian Female (Gadis)", "gender": "Female", "language": "Indonesian"}, | |
| {"name": "id-ID-ArdiNeural", "label": "Indonesian Male (Ardi)", "gender": "Male", "language": "Indonesian"}, | |
| {"name": "en-US-JennyNeural", "label": "English US Female (Jenny)", "gender": "Female", "language": "English"}, | |
| {"name": "en-US-GuyNeural", "label": "English US Male (Guy)", "gender": "Male", "language": "English"}, | |
| {"name": "ja-JP-NanamiNeural", "label": "Japanese Female (Nanami)", "gender": "Female", "language": "Japanese"}, | |
| {"name": "ja-JP-KeitaNeural", "label": "Japanese Male (Keita)", "gender": "Male", "language": "Japanese"}, | |
| ] | |
| BACK4APP_TTS_URL = os.getenv("BACK4APP_TTS_URL") | |
| HF_SPACE_TTS_URL = os.getenv("HF_SPACE_TTS_URL") | |
| async def generate_tts_with_hf_space(text: str, speaker: str, tts_wav: str): | |
| """ | |
| Generate TTS menggunakan Hugging Face Space API. | |
| Args: | |
| text: Teks yang akan diubah menjadi suara | |
| speaker: Nama speaker (contoh: "gadis") | |
| tts_wav: Path file output untuk menyimpan audio | |
| """ | |
| try: | |
| # 1. Kirim request ke API TTS | |
| response = requests.post( | |
| f"{HF_SPACE_TTS_URL}/api/tts", | |
| json={"text": text, "speaker": speaker}, | |
| headers={"Content-Type": "application/json"}, | |
| timeout=60 | |
| ) | |
| if response.status_code != 200: | |
| raise HTTPException( | |
| status_code=500, | |
| detail=f"HF Space TTS failed: {response.status_code} - {response.text}" | |
| ) | |
| response.raise_for_status() | |
| data = response.json() | |
| # 2. Validasi response | |
| if not data.get("success"): | |
| raise HTTPException( | |
| status_code=500, | |
| detail=f"TTS API returned error: {data.get('message', 'Unknown error')}" | |
| ) | |
| # 3. Ambil download URL dari response | |
| download_url = data.get("download_url") | |
| if not download_url: | |
| raise HTTPException( | |
| status_code=500, | |
| detail="Response missing download_url" | |
| ) | |
| # 4. Download file audio | |
| # Jika download_url relatif, tambahkan base URL | |
| if download_url.startswith("/"): | |
| full_download_url = f"{HF_SPACE_TTS_URL}{download_url}" | |
| else: | |
| full_download_url = download_url | |
| r = requests.get(full_download_url, stream=True, timeout=60) | |
| r.raise_for_status() | |
| # 5. Simpan file ke tts_wav | |
| with open(tts_wav, "wb") as f: | |
| for chunk in r.iter_content(8192): | |
| f.write(chunk) | |
| except requests.exceptions.RequestException as e: | |
| raise HTTPException(status_code=500, detail=f"TTS error via HF Space: {e}") | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"TTS error via HF Space: {e}") | |
| async def generate_tts_with_back4app(text: str, voice: str, tts_wav: str): | |
| try: | |
| response = requests.post( | |
| f"{BACK4APP_TTS_URL}/tts", | |
| json={"text": text, "voice": voice}, | |
| timeout=60 | |
| ) | |
| if response.status_code != 200: | |
| raise HTTPException(status_code=500, detail=f"Back4App TTS failed: {response.text}") | |
| response.raise_for_status() | |
| data = response.json() | |
| # 2. Ambil file URL dari response | |
| tts_url = data["file"] | |
| r = requests.get(f"{BACK4APP_TTS_URL}{tts_url}", stream=True) | |
| r.raise_for_status() | |
| with open(tts_wav, "wb") as f: | |
| for chunk in r.iter_content(8192): | |
| f.write(chunk) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"TTS error via Back4App: {e}") | |
| class TTSRequest(BaseModel): | |
| text: str | |
| name: str # nama model yang sesuai dengan daftar di 'models' | |
| tts_voice: str = "id-ID-GadisNeural" | |
| f0_up_key: int = 0 | |
| def limit_tts_files(output_dir, max_files=10): | |
| files = sorted( | |
| [os.path.join(output_dir, f) for f in os.listdir(output_dir)], | |
| key=os.path.getmtime | |
| ) | |
| while len(files) > max_files: | |
| os.remove(files[0]) | |
| files.pop(0) | |
| async def tts_api(req: TTSRequest): | |
| # Cari model berdasarkan name | |
| model = next((m for m in path_models if m["name"] == req.name), None) | |
| if not model: | |
| raise HTTPException(status_code=404, detail=f"Model '{req.name}' not found.") | |
| ckpt_path = os.path.join("weights", model["ckpt_path"]) | |
| index_path = os.path.join("weights", model["index_path"]) | |
| # Cek file model dan index | |
| if not os.path.isfile(ckpt_path): | |
| raise HTTPException(status_code=404, detail=f"Model file not found: {ckpt_path}") | |
| if not os.path.isfile(index_path): | |
| raise HTTPException(status_code=404, detail=f"Index file not found: {index_path}") | |
| # Path output | |
| output_dir = "/app/outputs" | |
| os.makedirs(output_dir, exist_ok=True) | |
| limit_tts_files(output_dir, max_files=10) | |
| tts_wav = f"{output_dir}/{uuid4().hex}_tts.wav" | |
| output_wav = f"{output_dir}/{uuid4().hex}_rvc.wav" | |
| index_rate = 0.75 | |
| # 1. Generate TTS | |
| try: | |
| # Ganti pakai Back4App TTS | |
| # communicate = edge_tts.Communicate(req.text, req.tts_voice) | |
| # with open(tts_wav, "wb") as f: | |
| # async for chunk in communicate.stream(): | |
| # if chunk["type"] == "audio": | |
| # f.write(chunk["data"]) | |
| # await generate_tts_with_back4app(req.text, req.tts_voice, tts_wav) | |
| await generate_tts_with_hf_space(req.text, req.tts_voice, tts_wav) | |
| # Validasi file audio TTS | |
| if not os.path.isfile(tts_wav): | |
| raise HTTPException(status_code=500, detail="TTS file was not created") | |
| if os.path.getsize(tts_wav) == 0: | |
| raise HTTPException(status_code=500, detail="TTS file is empty") | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"TTS error: {e}") | |
| # 2. Voice Conversion | |
| try: | |
| models.load_model(ckpt_path) | |
| vc = models.vc_model | |
| # vc = load_model(ckpt_path, config_json="configs/48k-768.json") | |
| if vc is None: | |
| raise Exception("Failed to load model") | |
| # Run conversion menggunakan method single() yang benar | |
| audio_opt = vc.single( | |
| sid=0, # speaker id | |
| input_audio=tts_wav, # path audio input | |
| embedder_model_name="auto", # auto detect embedder | |
| embedding_output_layer="auto", # auto detect layer | |
| f0_up_key=req.f0_up_key, # pitch shift | |
| f0_file="", # f0 curve file (kosong) | |
| # f0_method="pm", # f0 method | |
| f0_method="harvest", # f0 method | |
| auto_load_index=True, # auto load index | |
| faiss_index_file=index_path, # index file path | |
| index_rate=index_rate, # index rate | |
| output_dir=output_dir # output directory | |
| ) | |
| # Cek apakah result adalah numpy array yang valid | |
| print("input_audio : ", tts_wav) | |
| print("audio_opt type: ", type(audio_opt)) | |
| print("audio_opt shape: ", audio_opt.shape if hasattr(audio_opt, 'shape') else 'N/A') | |
| print("audio_opt sample: ", audio_opt[:10] if hasattr(audio_opt, '__getitem__') else 'N/A') | |
| if not isinstance(audio_opt, np.ndarray): | |
| raise HTTPException(status_code=500, detail=f"RVC error: Expected numpy array, got {type(audio_opt)}") | |
| # Cek apakah audio_opt tidak kosong (tidak semua zeros) | |
| if len(audio_opt) == 0 or np.all(audio_opt == 0): | |
| raise HTTPException(status_code=500, detail="RVC error: Generated audio is empty or all zeros. Check input audio and model configuration.") | |
| # Gunakan tgt_sr dari model | |
| tgt_sr = vc.tgt_sr | |
| sf.write(output_wav, audio_opt, tgt_sr) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"RVC exception error: {e}") | |
| # Ambil domain dari environment Hugging Face | |
| space_id = os.environ.get("SPACE_ID") | |
| if space_id: | |
| username, space_name = space_id.split("/") | |
| space_url = f"https://{username}-rvc-api.hf.space" | |
| public_url = f"{space_url}/file-tmp?path={output_wav}" | |
| else: | |
| public_url = f"/file-tmp?path={output_wav}" | |
| return {"result": public_url} | |
| def get_tmp_file(path: str): | |
| # Security: hanya izinkan akses file di /app/outputs | |
| if not path.startswith("/app/outputs/"): | |
| raise HTTPException(status_code=403, detail="Forbidden") | |
| if not os.path.isfile(path): | |
| raise HTTPException(status_code=404, detail="File not found") | |
| return FileResponse(path) | |
| # Jalankan dengan: uvicorn api_tts:app --reload |