| |
| import os |
| import uuid |
| import torch |
| import asyncio |
| import gradio as gr |
| from fastapi import FastAPI, UploadFile, File, Form |
| from fastapi.responses import FileResponse |
| from TTS.api import TTS |
| import uvicorn |
| from collections import deque |
|
|
| |
| |
| |
| os.environ["COQUI_TOS_AGREED"] = "1" |
|
|
| |
| |
| |
| device = "cuda" if torch.cuda.is_available() else "cpu" |
|
|
| print("π Loading XTTS model...") |
| tts = TTS( |
| model_name="tts_models/multilingual/multi-dataset/xtts_v2", |
| progress_bar=False |
| ).to(device) |
| print("β
Model loaded!") |
|
|
| |
| |
| |
| OUTPUT_DIR = "outputs" |
| os.makedirs(OUTPUT_DIR, exist_ok=True) |
|
|
| |
| |
| |
| BATCH_SIZE = 3 |
| BATCH_WAIT_TIME = 1 |
|
|
| request_queue = deque() |
|
|
| |
| |
| |
| async def batch_worker(): |
| print("π₯ Batch worker started...") |
|
|
| while True: |
| if len(request_queue) == 0: |
| await asyncio.sleep(0.1) |
| continue |
|
|
| |
| await asyncio.sleep(BATCH_WAIT_TIME) |
|
|
| batch = [] |
| while len(request_queue) > 0 and len(batch) < BATCH_SIZE: |
| batch.append(request_queue.popleft()) |
|
|
| print(f"β‘ Processing batch of {len(batch)}") |
|
|
| for item in batch: |
| text, lang, audio_path, output_path, future = item |
|
|
| try: |
| tts.tts_to_file( |
| text=text, |
| speaker_wav=audio_path, |
| language=lang, |
| file_path=output_path, |
| split_sentences=True |
| ) |
| future.set_result(output_path) |
|
|
| except Exception as e: |
| future.set_result(str(e)) |
|
|
|
|
| |
| |
| |
| api = FastAPI() |
|
|
| @api.on_event("startup") |
| async def startup_event(): |
| asyncio.create_task(batch_worker()) |
|
|
|
|
| @api.post("/clone-voice/") |
| async def clone_voice_api( |
| text: str = Form(...), |
| language: str = Form(...), |
| audio: UploadFile = File(...) |
| ): |
| try: |
| input_path = f"{OUTPUT_DIR}/{uuid.uuid4()}_in.wav" |
| output_path = f"{OUTPUT_DIR}/{uuid.uuid4()}_out.wav" |
|
|
| with open(input_path, "wb") as f: |
| f.write(await audio.read()) |
|
|
| loop = asyncio.get_event_loop() |
| future = loop.create_future() |
|
|
| request_queue.append((text, language, input_path, output_path, future)) |
|
|
| result = await future |
|
|
| if isinstance(result, str) and result.endswith(".wav"): |
| return FileResponse(result, media_type="audio/wav") |
| else: |
| return {"error": result} |
|
|
| except Exception as e: |
| return {"error": str(e)} |
|
|
|
|
| |
| |
| |
| async def clone_voice_ui(audio_path, text, language): |
| if audio_path is None: |
| return "β Upload audio", None |
|
|
| if text.strip() == "": |
| return "β Enter text", None |
|
|
| output_path = f"{OUTPUT_DIR}/{uuid.uuid4()}.wav" |
|
|
| loop = asyncio.get_event_loop() |
| future = loop.create_future() |
|
|
| request_queue.append((text, language, audio_path, output_path, future)) |
|
|
| result = await future |
|
|
| if isinstance(result, str) and result.endswith(".wav"): |
| return "β
Done", result |
| else: |
| return f"β {result}", None |
|
|
|
|
| with gr.Blocks(title="XTTS Voice Cloning (Batching)") as demo: |
| gr.Markdown("# π€ XTTS Voice Cloning (Batch Mode)") |
|
|
| audio_input = gr.Audio(type="filepath", label="Speaker Audio") |
| text_input = gr.Textbox(label="Text") |
| lang_input = gr.Textbox(value="en", label="Language") |
|
|
| btn = gr.Button("Generate") |
|
|
| status = gr.Textbox(label="Status") |
| output_audio = gr.Audio(label="Generated Audio") |
|
|
| btn.click( |
| fn=clone_voice_ui, |
| inputs=[audio_input, text_input, lang_input], |
| outputs=[status, output_audio] |
| ) |
|
|
| |
| demo.queue(max_size=20) |
|
|
| |
| |
| |
| app = gr.mount_gradio_app(api, demo, path="/") |
|
|
| |
| |
| |
| if __name__ == "__main__": |
| uvicorn.run(app, host="0.0.0.0", port=7860) |