import os import uuid import torch import asyncio import gradio as gr from fastapi import FastAPI, UploadFile, File, Form from fastapi.responses import FileResponse from TTS.api import TTS import uvicorn from collections import deque # ========================= # ✅ ACCEPT LICENSE # ========================= os.environ["COQUI_TOS_AGREED"] = "1" # ========================= # 🔥 LOAD MODEL ONCE # ========================= device = "cuda" if torch.cuda.is_available() else "cpu" print("🚀 Loading XTTS model...") tts = TTS( model_name="tts_models/multilingual/multi-dataset/xtts_v2", progress_bar=False ).to(device) print("✅ Model loaded!") # ========================= # 📁 OUTPUT DIR # ========================= OUTPUT_DIR = "outputs" os.makedirs(OUTPUT_DIR, exist_ok=True) # ========================= # ⚡ BATCH CONFIG # ========================= BATCH_SIZE = 3 BATCH_WAIT_TIME = 1 # seconds request_queue = deque() # ========================= # 🔥 BATCH WORKER # ========================= async def batch_worker(): print("🔥 Batch worker started...") while True: if len(request_queue) == 0: await asyncio.sleep(0.1) continue # Wait to collect batch await asyncio.sleep(BATCH_WAIT_TIME) batch = [] while len(request_queue) > 0 and len(batch) < BATCH_SIZE: batch.append(request_queue.popleft()) print(f"⚡ Processing batch of {len(batch)}") for item in batch: text, lang, audio_path, output_path, future = item try: tts.tts_to_file( text=text, speaker_wav=audio_path, language=lang, file_path=output_path, split_sentences=True ) future.set_result(output_path) except Exception as e: future.set_result(str(e)) # ========================= # FASTAPI # ========================= api = FastAPI() @api.on_event("startup") async def startup_event(): asyncio.create_task(batch_worker()) @api.post("/clone-voice/") async def clone_voice_api( text: str = Form(...), language: str = Form(...), audio: UploadFile = File(...) ): try: input_path = f"{OUTPUT_DIR}/{uuid.uuid4()}_in.wav" output_path = f"{OUTPUT_DIR}/{uuid.uuid4()}_out.wav" with open(input_path, "wb") as f: f.write(await audio.read()) loop = asyncio.get_event_loop() future = loop.create_future() request_queue.append((text, language, input_path, output_path, future)) result = await future if isinstance(result, str) and result.endswith(".wav"): return FileResponse(result, media_type="audio/wav") else: return {"error": result} except Exception as e: return {"error": str(e)} # ========================= # GRADIO UI # ========================= async def clone_voice_ui(audio_path, text, language): if audio_path is None: return "❌ Upload audio", None if text.strip() == "": return "❌ Enter text", None output_path = f"{OUTPUT_DIR}/{uuid.uuid4()}.wav" loop = asyncio.get_event_loop() future = loop.create_future() request_queue.append((text, language, audio_path, output_path, future)) result = await future if isinstance(result, str) and result.endswith(".wav"): return "✅ Done", result else: return f"❌ {result}", None with gr.Blocks(title="XTTS Voice Cloning (Batching)") as demo: gr.Markdown("# 🎤 XTTS Voice Cloning (Batch Mode)") audio_input = gr.Audio(type="filepath", label="Speaker Audio") text_input = gr.Textbox(label="Text") lang_input = gr.Textbox(value="en", label="Language") btn = gr.Button("Generate") status = gr.Textbox(label="Status") output_audio = gr.Audio(label="Generated Audio") btn.click( fn=clone_voice_ui, inputs=[audio_input, text_input, lang_input], outputs=[status, output_audio] ) # ✅ FIXED QUEUE (no concurrency_count) demo.queue(max_size=20) # ========================= # COMBINE APP # ========================= app = gr.mount_gradio_app(api, demo, path="/") # ========================= # RUN SERVER # ========================= if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=7860)